使用 TensorFlow 进行分布式训练

在 TensorFlow.org 上查看 在 Google Colab 中运行 在 GitHub 上查看源代码 下载笔记本

概述

tf.distribute.Strategy 是一个可在多个 GPU、多台机器或 TPU 上进行分布式训练的 TensorFlow API。使用此 API,您只需改动较少代码就能分布现有模型和训练代码。

tf.distribute.Strategy 旨在实现以下目标:

  • 易于使用,支持多种用户(包括研究人员和 ML 工程师等)。
  • 提供开箱即用的良好性能。
  • 轻松切换策略。

tf.distribute.Strategy 可用于 Keras 等高级 API,也可用来分布自定义训练循环(以及,一般来说,使用 TensorFlow 的任何计算)。

在 TensorFlow 2.x 中,您可以立即执行程序,也可以使用 tf.function 在计算图中执行。虽然 tf.distribute.Strategy 对两种执行模式都支持,但使用 tf.function 效果最佳。建议仅将 Eager 模式用于调试,而 TPUStrategy 不支持此模式。尽管本指南大部分时间在讨论训练,但该 API 也可用于在不同平台上分布评估和预测。

您在使用 tf.distribute.Strategy 时只需改动少量代码,因为我们修改了 TensorFlow 的底层组件,使其可感知策略。这些组件包括变量、层、优化器、指标、摘要和检查点。

在本指南中,我们将介绍各种类型的策略,以及如何在不同情况下使用它们。

注:为了更深入地理解这些概念,请观看此深入演示。如果您计划编写自己的训练循环,尤其建议您观看此视频。

# Import TensorFlow
import tensorflow as tf

策略类型

tf.distribute.Strategy 打算涵盖不同轴上的许多用例。目前已支持其中的部分组合,将来还会添加其他组合。其中一些轴包括:

  • 同步和异步训练:这是通过数据并行进行分布式训练的两种常用方法。在同步训练中,所有工作进程都同步地对输入数据的不同片段进行训练,并且会在每一步中聚合梯度。在异步训练中,所有工作进程都独立训练输入数据并异步更新变量。通常情况下,同步训练通过全归约实现,而异步训练通过参数服务器架构实现。
  • 硬件平台:您可能需要将训练扩展到一台机器上的多个 GPU 或一个网络中的多台机器(每台机器拥有 0 个或多个 GPU),或扩展到 Cloud TPU 上。

要支持这些用例,有六种策略可选。在下一部分,我们将说明当前在 TF 2.2 的哪些场景中支持哪些策略。以下为快速概览:

训练 API MirroredStrategy TPUStrategy MultiWorkerMirroredStrategy CentralStorageStrategy ParameterServerStrategy
Keras API 支持 支持 实验性支持 实验性支持 计划于 2.3 后支持
自定义训练循环 支持 支持 实验性支持 实验性支持 计划于 2.3 后支持
Estimator API 有限支持 不支持 有限支持 有限支持 有限支持

注:实验性支持指不保证该 API 的兼容性。

注:对 Estimator 提供有限支持。基本训练和评估都是实验性的,而未实现高级功能(如基架)。如未涵盖某一用例,建议您使用 Keras 或自定义训练循环。

MirroredStrategy

tf.distribute.MirroredStrategy 支持在一台机器的多个 GPU 上进行同步分布式训练。该策略会为每个 GPU 设备创建一个副本。模型中的每个变量都会在所有副本之间进行镜像。这些变量将共同形成一个名为 MirroredVariable 的单个概念变量。这些变量会通过应用相同的更新彼此保持同步。

高效的全归约算法用于在设备之间传递变量更新。全归约算法通过加总各个设备上的张量使其聚合,并使其在每个设备上可用。这是一种非常高效的融合算法,可以显著减少同步开销。根据设备之间可用的通信类型,可以使用的全归约算法和实现方法有很多。默认使用 NVIDIA NCCL 作为全归约实现。您可以选择我们提供的其他选项,也可以自己编写。

以下是创建 MirroredStrategy 最简单的方法:

mirrored_strategy = tf.distribute.MirroredStrategy()
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

这会创建一个 MirroredStrategy 实例,该实例使用所有对 TensorFlow 可见的 GPU,并使用 NCCL 进行跨设备通信。

如果您只想使用机器上的部分 GPU,您可以这样做:

mirrored_strategy = tf.distribute.MirroredStrategy(devices=["/gpu:0", "/gpu:1"])
WARNING:tensorflow:Some requested devices in `tf.distribute.Strategy` are not visible to TensorFlow: /job:localhost/replica:0/task:0/device:GPU:0,/job:localhost/replica:0/task:0/device:GPU:1
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1')

如果您想重写跨设备通信,可以通过提供 tf.distribute.CrossDeviceOps 的实例,使用 cross_device_ops 参数来实现。目前,除了默认选项 tf.distribute.NcclAllReduce 外,还有 tf.distribute.HierarchicalCopyAllReducetf.distribute.ReductionToOneDevice 两个选项。

mirrored_strategy = tf.distribute.MirroredStrategy(
    cross_device_ops=tf.distribute.HierarchicalCopyAllReduce())
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

TPUStrategy

您可以使用 tf.distribute.experimental.TPUStrategy 在张量处理单元 (TPU) 上运行 TensorFlow 训练。TPU 是 Google 的专用 ASIC,旨在显著加速机器学习工作负载。您可通过 Google Colab、TensorFlow Research CloudCloud TPU 平台进行使用。

就分布式训练架构而言,TPUStrategyMirroredStrategy 是一样的,即实现同步分布式训练。TPU 会在多个 TPU 核心之间实现高效的全归约和其他集合运算,并将其用于 TPUStrategy

下面演示了如何将 TPUStrategy 实例化:

注:要在 Colab 中运行此代码,应将 TPU 作为 Colab 运行时。具体请参阅 TensorFlow TPU 指南

cluster_resolver = tf.distribute.cluster_resolver.TPUClusterResolver(     tpu=tpu_address) tf.config.experimental_connect_to_cluster(cluster_resolver) tf.tpu.experimental.initialize_tpu_system(cluster_resolver) tpu_strategy = tf.distribute.experimental.TPUStrategy(cluster_resolver)

TPUClusterResolver 实例可帮助定位 TPU。在 Colab 中,您无需为其指定任何参数。

如果要将其用于 Cloud TPU,您必须:

  • tpu 参数中指定 TPU 资源的名称。
  • 在程序开始时显式地初始化 TPU 系统。这是使用 TPU 进行计算前的必须步骤。初始化 TPU 系统还会清除 TPU 内存,所以为了避免丢失状态,请务必先完成此步骤。

MultiWorkerMirroredStrategy

tf.distribute.experimental.MultiWorkerMirroredStrategyMirroredStrategy 非常相似。它实现了跨多个工作进程的同步分布式训练,而每个工作进程可能有多个 GPU。与 MirroredStrategy 类似,它也会跨所有工作进程在每个设备的模型中创建所有变量的副本。

它使用 CollectiveOps 作为多工作进程全归约通信方法,用于保持变量同步。集合运算是 TensorFlow 计算图中的单个运算,它可以根据硬件、网络拓扑和张量大小在 TensorFlow 运行期间自动选择全归约算法。

它还实现了其他性能优化。例如,静态优化,可以将小张量上的多个全归约转化为大张量上较少的全归约。此外,我们还在为它设计插件架构,这样您将来就能以插件的形式使用针对您的硬件进行了更好优化的算法。请注意,集合运算还可以实现其他集合运算,比如广播和全收集。

以下是创建 MultiWorkerMirroredStrategy 最简单的方法:

multiworker_strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/device:GPU:0',)
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CollectiveCommunication.AUTO

MultiWorkerMirroredStrategy 目前为您提供两种不同的集合运算实现方法。CollectiveCommunication.RING 通过将 gRPC 用作通信层来实现基于环的集合。CollectiveCommunication.NCCL 使用 NVIDIA 的 NCCL 来实现集合。CollectiveCommunication.AUTO 会将选择推迟到运行时。集合实现的最佳选择取决于 GPU 的数量和种类,以及集群中的网络互连。您可以通过以下方式来指定:

multiworker_strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy(
    tf.distribute.experimental.CollectiveCommunication.NCCL)
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/device:GPU:0',)
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CollectiveCommunication.NCCL

与多 GPU 训练相比,多工作进程训练的一个主要差异是多工作进程的设置。TF_CONFIG 环境变量是在 TensorFlow 中为作为集群一部分的每个工作进程指定集群配置的标准方法。详细了解如何设置 TF_CONFIG

注:此策略处于 experimental 阶段,我们目前正在进行改进,使其能够用于更多场景。敬请期待 API 的未来变化。

CentralStorageStrategy

tf.distribute.experimental.CentralStorageStrategy 也执行同步训练。变量不会被镜像,而是放在 CPU 上,且运算会复制到所有本地 GPU 。如果只有一个 GPU,则所有变量和运算都将被放在该 GPU 上。

请通过以下代码,创建 CentralStorageStrategy 实例:

central_storage_strategy = tf.distribute.experimental.CentralStorageStrategy()
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:localhost/replica:0/task:0/device:GPU:0'], variable_device = '/job:localhost/replica:0/task:0/device:GPU:0'

这会创建一个 CentralStorageStrategy 实例,该实例将使用所有可见的 GPU 和 CPU。在副本上对变量的更新将先进行聚合,然后再应用于变量。

注:此策略处于 experimental 阶段,我们目前正在进行改进,使其能够用于更多场景。敬请期待 API 的未来变化。

ParameterServerStrategy

tf.distribute.experimental.ParameterServerStrategy 支持在多台机器上进行参数服务器训练。在此设置中,有些机器会被指定为工作进程,有些会被指定为参数服务器。模型的每个变量都会被放在参数服务器上。计算会被复制到所有工作进程的所有 GPU 中。

就代码而言,该策略看起来与其他策略类似:

ps_strategy = tf.distribute.experimental.ParameterServerStrategy()

对于多工作进程训练,TF_CONFIG 需要在集群中指定参数服务器和工作进程的配置,有关详细信息,可以阅读下面的 TF_CONFIG

注:该策略仅适用于 Estimator API。

其他策略

除上述策略外,还有其他两种策略可能对使用 tf.distribute API 进行原型设计和调试有所帮助。

默认策略

默认策略是一种分布式策略,当作用域内没有显式分布策略时就会出现。此策略会实现 tf.distribute.Strategy 接口,但只具有传递功能,不提供实际分布。例如,strategy.run(fn) 只会调用 fn。使用该策略编写的代码与未使用任何策略编写的代码完全一样。您可以将其视为“无运算”策略。

默认策略是一种单一实例,无法创建它的更多实例。可通过在任意显式策略的作用域(与可用于在显式策略的作用域内获得当前策略的 API 相同)外使用 tf.distribute.get_strategy() 获得该策略。

default_strategy = tf.distribute.get_strategy()

该策略有两个主要用途:

  • 它允许无条件编写可感知分布的库代码。例如,在优化器中,我们可以执行 tf.distribute.get_strategy() 并使用该策略来减少梯度,而它将始终返回一个我们可以在其上调用归约 API 的策略对象。
# In optimizer or other library code
# Get currently active strategy
strategy = tf.distribute.get_strategy()
strategy.reduce("SUM", 1.)  # reduce some values
1.0
  • 与库代码类似,它可以用来在使用或不使用分布策略的情况下编写最终用户的程序,而无需条件逻辑。以下示例代码段展示了这一点:
if tf.config.list_physical_devices('gpu'):
  strategy = tf.distribute.MirroredStrategy()
else:  # use default strategy
  strategy = tf.distribute.get_strategy() 

with strategy.scope():
  # do something interesting
  print(tf.Variable(1.))
<tf.Variable 'Variable:0' shape=() dtype=float32, numpy=1.0>

OneDeviceStrategy

tf.distribute.OneDeviceStrategy 是一种会将所有变量和计算放在单个指定设备上的策略。

strategy = tf.distribute.OneDeviceStrategy(device="/gpu:0")

此策略与默认策略在诸多方面存在差异。在默认策略中,与没有任何分布策略的 TensorFlow 运行相比,变量放置逻辑保持不变。但是当使用 OneDeviceStrategy 时,在其作用域内创建的所有变量都会被显式地放在指定设备上。此外,通过 OneDeviceStrategy.run 调用的任何函数也会被放在指定设备上。

通过该策略分布的输入将被预提取到指定设备。而在默认策略中,则没有输入分布。

与默认策略类似,在切换到实际分布到多个设备/机器的其他策略之前,也可以使用此策略来测试代码。这将比默认策略更多地使用分布策略机制,但不能像使用 MirroredStrategyTPUStrategy 等策略那样充分发挥其作用。如果您想让代码表现地像没有策略,请使用默认策略。

目前为止,我们已经讨论了可用的不同策略以及如何将其实例化。在接下来的几个部分中,我们将讨论使用它们分布训练的不同方法。我们将在本指南中展示简短的代码段,并附上可以从头到尾运行的完整教程的链接。

tf.keras.Model.fit 中使用 tf.distribute.Strategy

我们已将 tf.distribute.Strategy 集成到 tf.keras(TensorFlow 对 Keras API 规范的实现)。tf.keras 是用于构建和训练模型的高级 API。将该策略集成到 tf.keras 后端以后,您可以使用 model.fit 在 Keras 训练框架中无缝进行分布式训练。

您需要对代码进行以下更改:

  1. 创建一个合适的 tf.distribute.Strategy 实例。
  2. 将 Keras 模型、优化器和指标的创建转移到 strategy.scope 中。

我们支持所有类型的 Keras 模型:序贯模型、函数式模型和子类化模型。

下面是一段代码,执行该代码会创建一个非常简单的带有一个密集层的 Keras 模型:

mirrored_strategy = tf.distribute.MirroredStrategy()

with mirrored_strategy.scope():
  model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])

model.compile(loss='mse', optimizer='sgd')
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

在此示例中我们使用了 MirroredStrategy,因此我们可以在有多个 GPU 的机器上运行。strategy.scope() 会指示 Keras 使用哪个策略来进行分布式训练。我们可以通过在此作用域内创建模型/优化器/指标来创建分布式变量而非常规变量。设置完成后,您就可以像平常一样拟合模型。MirroredStrategy 负责将模型的训练复制到可用的 GPU 上,以及聚合梯度等。

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(10)
model.fit(dataset, epochs=2)
model.evaluate(dataset)
Epoch 1/2
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/data/ops/multi_device_iterator_ops.py:601: get_next_as_optional (from tensorflow.python.data.ops.iterator_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.data.Iterator.get_next_as_optional()` instead.
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
10/10 [==============================] - 0s 2ms/step - loss: 1.7210
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
Epoch 2/2
10/10 [==============================] - 0s 2ms/step - loss: 0.7607
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
10/10 [==============================] - 0s 2ms/step - loss: 0.4724

0.47240519523620605

我们在这里使用了 tf.data.Dataset 来提供训练和评估输入。您还可以使用 Numpy 数组:

import numpy as np
inputs, targets = np.ones((100, 1)), np.ones((100, 1))
model.fit(inputs, targets, epochs=2, batch_size=10)
Epoch 1/2
10/10 [==============================] - 0s 2ms/step - loss: 0.3362
Epoch 2/2
10/10 [==============================] - 0s 2ms/step - loss: 0.1486

<tensorflow.python.keras.callbacks.History at 0x7ff2b85af9b0>

在上述两种情况(数据集或 Numpy)中,给定输入的每个批次都被平均分到了多个副本中。例如,如果对 2 个 GPU 使用 MirroredStrategy,大小为 10 的每个批次将被均分到 2 个 GPU 中,每个 GPU 每步会接收 5 个输入样本。如果添加更多 GPU,每个周期的训练速度就会更快。在添加更多加速器时通常需要增加批次大小,以便有效利用额外的计算能力。您还需要根据模型重新调整学习率。您可以使用 strategy.num_replicas_in_sync 获得副本数量。

# Compute global batch size using number of replicas.
BATCH_SIZE_PER_REPLICA = 5
global_batch_size = (BATCH_SIZE_PER_REPLICA *
                     mirrored_strategy.num_replicas_in_sync)
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100)
dataset = dataset.batch(global_batch_size)

LEARNING_RATES_BY_BATCH_SIZE = {5: 0.1, 10: 0.15}
learning_rate = LEARNING_RATES_BY_BATCH_SIZE[global_batch_size]

目前支持的策略

训练 API MirroredStrategy TPUStrategy MultiWorkerMirroredStrategy CentralStorageStrategy ParameterServerStrategy
Keras API 支持 支持 实验性支持 实验性支持 计划于 2.3 后支持

示例和教程

下列教程和示例完整演示了上述集成到 Keras 的过程:

  1. 使用 MirroredStrategy 训练 MNIST 的教程
  2. 使用 MultiWorkerMirroredStrategy 训练 MNIST 的教程
  3. 使用 TPUStrategy 训练 MNIST 的指南
  4. 包含使用各种策略实现的最先进模型集合的 TensorFlow Model Garden 仓库

在自定义训练循环中使用 tf.distribute.Strategy

如您所见,在 Keras model.fit 中使用 tf.distribute.Strategy 只需改动几行代码。再多花点功夫,您还可以在自定义训练循环中使用 tf.distribute.Strategy

如果您需要更多相对于使用 Estimator 或 Keras 时的灵活性和对训练循环的控制权,您可以编写自定义训练循环。例如,在使用 GAN 时,您可能会希望每轮使用不同数量的生成器或判别器步骤。同样,高级框架也不太适合强化学习训练。

为了支持自定义训练循环,我们通过 tf.distribute.Strategy 类提供了一组核心方法。使用这些方法可能需要在开始时对代码进行轻微重构,但完成重构后,您只需更改策略实例就能够在 GPU、TPU 和多台机器之间进行切换。

下面我们将用一个简短的代码段说明此用例,其中的简单训练样本使用与之前相同的 Keras 模型。

首先,在该策略的作用域内创建模型和优化器。这样可以确保使用此模型和优化器创建的任何变量都是镜像变量。

with mirrored_strategy.scope():
  model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])
  optimizer = tf.keras.optimizers.SGD()

接下来,我们创建输入数据集并调用 tf.distribute.Strategy.experimental_distribute_dataset,以根据策略分布数据集。

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(
    global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

然后,我们定义一个训练步骤。我们将使用 tf.GradientTape 来计算梯度,并使用优化器来应用这些梯度以更新模型变量。要分布此训练步骤,我们加入一个 train_step 函数,并将此函数和从之前创建的 dist_dataset 获得的数据集输入一起传递给 tf.distrbute.Strategy.run

loss_object = tf.keras.losses.BinaryCrossentropy(
  from_logits=True,
  reduction=tf.keras.losses.Reduction.NONE)

def compute_loss(labels, predictions):
  per_example_loss = loss_object(labels, predictions)
  return tf.nn.compute_average_loss(per_example_loss, global_batch_size=global_batch_size)

def train_step(inputs):
  features, labels = inputs

  with tf.GradientTape() as tape:
    predictions = model(features, training=True)
    loss = compute_loss(labels, predictions)

  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))
  return loss

@tf.function
def distributed_train_step(dist_inputs):
  per_replica_losses = mirrored_strategy.run(train_step, args=(dist_inputs,))
  return mirrored_strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,
                         axis=None)

以上代码还需注意以下几点:

  1. 我们使用了 tf.nn.compute_average_loss 来计算损失。tf.nn.compute_average_loss 将每个样本的损失相加,然后将总和除以 global_batch_size。这很重要,因为稍后在每个副本上计算出梯度后,会通过对它们求和使其在副本中聚合。
  2. 我们使用了 tf.distribute.Strategy.reduce API 来聚合 tf.distribute.Strategy.run 返回的结果。tf.distribute.Strategy.run 会从策略中的每个本地副本返回结果,且有多种方法使用该结果。可以 reduce 它们以获得聚合值。还可以通过执行 tf.distribute.Strategy.experimental_local_results 获得包含在结果中的值的列表,每个本地副本一个列表。
  3. 当在一个分布策略作用域内调用 apply_gradients 时,它的行为会被修改。具体来说,在同步训练期间,在将梯度应用于每个并行实例之前,它会对梯度的所有副本求和。

最后,当我们定义完训练步骤后,就可以迭代 dist_dataset,并在循环中运行训练:

for dist_inputs in dist_dataset:
  print(distributed_train_step(dist_inputs))
tf.Tensor(1.5814351, shape=(), dtype=float32)
tf.Tensor(1.5688368, shape=(), dtype=float32)
tf.Tensor(1.5563213, shape=(), dtype=float32)
tf.Tensor(1.5438889, shape=(), dtype=float32)
tf.Tensor(1.5315396, shape=(), dtype=float32)
tf.Tensor(1.5192736, shape=(), dtype=float32)
tf.Tensor(1.5070914, shape=(), dtype=float32)
tf.Tensor(1.4949927, shape=(), dtype=float32)
tf.Tensor(1.4829779, shape=(), dtype=float32)
tf.Tensor(1.4710473, shape=(), dtype=float32)
tf.Tensor(1.4592004, shape=(), dtype=float32)
tf.Tensor(1.4474381, shape=(), dtype=float32)
tf.Tensor(1.43576, shape=(), dtype=float32)
tf.Tensor(1.4241662, shape=(), dtype=float32)
tf.Tensor(1.4126568, shape=(), dtype=float32)
tf.Tensor(1.401232, shape=(), dtype=float32)
tf.Tensor(1.3898916, shape=(), dtype=float32)
tf.Tensor(1.3786359, shape=(), dtype=float32)
tf.Tensor(1.3674647, shape=(), dtype=float32)
tf.Tensor(1.3563778, shape=(), dtype=float32)

在上面的示例中,我们通过迭代 dist_dataset 为训练提供输入。我们还提供 tf.distribute.Strategy.make_experimental_numpy_dataset 以支持 Numpy 输入。您可以在调用 tf.distribute.Strategy.experimental_distribute_dataset 之前使用此 API 来创建数据集。

迭代数据的另一种方法是显式地使用迭代器。当您希望运行给定数量的步骤而非迭代整个数据集时,可能会用到此方法。现在可以将上面的迭代修改为:先创建迭代器,然后在迭代器上显式地调用 next 以获得输入数据。

iterator = iter(dist_dataset)
for _ in range(10):
  print(distributed_train_step(next(iterator)))
tf.Tensor(1.3453755, shape=(), dtype=float32)
tf.Tensor(1.3344578, shape=(), dtype=float32)
tf.Tensor(1.3236245, shape=(), dtype=float32)
tf.Tensor(1.3128753, shape=(), dtype=float32)
tf.Tensor(1.3022105, shape=(), dtype=float32)
tf.Tensor(1.2916298, shape=(), dtype=float32)
tf.Tensor(1.2811332, shape=(), dtype=float32)
tf.Tensor(1.2707204, shape=(), dtype=float32)
tf.Tensor(1.2603915, shape=(), dtype=float32)
tf.Tensor(1.2501462, shape=(), dtype=float32)

上面是使用 tf.distribute.Strategy API 来分布自定义训练循环最简单的情况。我们正在改进这些 API。由于此用例还需进一步调整才能适应您的代码,我们未来会发布单独的详细指南。

目前支持的策略

训练 API MirroredStrategy TPUStrategy MultiWorkerMirroredStrategy CentralStorageStrategy ParameterServerStrategy
自定义训练循环 支持 支持 实验性支持 实验性支持 计划于 2.3 后支持

示例和教程

下面是在自定义训练循环中使用分布策略的一些示例:

  1. 使用 MirroredStrategy 训练 MNIST 的教程
  2. 使用 TPUStrategy 训练 MNIST 的指南
  3. 包含使用各种策略实现的最先进模型集合的 TensorFlow Model Garden 仓库

在 Estimator 中使用 tf.distribute.Strategy(有限支持)

tf.estimator 是分布式训练 TensorFlow API,最初支持异步参数服务器方法。与 Keras 类似,我们已将 tf.distribute.Strategy 集成到 tf.Estimator。如果您正在使用 Estimator 进行训练,那么您只需改动少量代码即可轻松转换为分布式训练。借助此功能,Estimator 用户现在可以在多个 GPU 和多个工作进程以及 TPU 上进行同步分布式训练。但是,Estimator 的这种支持是有限的。有关详细信息,请参阅下文目前支持的策略部分。

在 Estimator 中使用 tf.distribute.Strategy 的方法与 Keras 略有不同。现在我们不使用 strategy.scope,而是将策略对象传递到 Estimator 的 RunConfig 中。

以下代码段使用预制 Estimator LinearRegressorMirroredStrategy 展示了这种情况:

mirrored_strategy = tf.distribute.MirroredStrategy()
config = tf.estimator.RunConfig(
    train_distribute=mirrored_strategy, eval_distribute=mirrored_strategy)
regressor = tf.estimator.LinearRegressor(
    feature_columns=[tf.feature_column.numeric_column('feats')],
    optimizer='SGD',
    config=config)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
INFO:tensorflow:Initializing RunConfig with distribution strategies.
INFO:tensorflow:Not using Distribute Coordinator.
WARNING:tensorflow:Using temporary folder as model directory: /tmp/tmpv0t1p98n
INFO:tensorflow:Using config: {'_model_dir': '/tmp/tmpv0t1p98n', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': <tensorflow.python.distribute.mirrored_strategy.MirroredStrategy object at 0x7ff2b812b160>, '_device_fn': None, '_protocol': None, '_eval_distribute': <tensorflow.python.distribute.mirrored_strategy.MirroredStrategy object at 0x7ff2b812b160>, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_service': None, '_cluster_spec': ClusterSpec({}), '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1, '_distribute_coordinator_mode': None}

我们在这里使用了预制 Estimator,但同样的代码也适用于自定义 Estimator。train_distribute 决定训练如何分布,eval_distribute 决定评估如何分布。这是与 Keras 的另一个区别,在 Keras 中,我们会对训练和评估使用相同的策略。

现在,我们可以使用输入函数来训练和评估这个 Estimator:

def input_fn():
  dataset = tf.data.Dataset.from_tensors(({"feats":[1.]}, [1.]))
  return dataset.repeat(1000).batch(10)
regressor.train(input_fn=input_fn, steps=10)
regressor.evaluate(input_fn=input_fn, steps=10)
INFO:tensorflow:Calling model_fn.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow_estimator/python/estimator/canned/linear.py:1481: Layer.add_variable (from tensorflow.python.keras.engine.base_layer_v1) is deprecated and will be removed in a future version.
Instructions for updating:
Please use `layer.add_weight` method instead.
INFO:tensorflow:Done calling model_fn.
WARNING:tensorflow:AutoGraph could not transform <function _combine_distributed_scaffold.<locals>.<lambda> at 0x7ff2b01248c8> and will run it as-is.
Cause: could not parse the source code:

      lambda scaffold: scaffold.ready_op, args=(grouped_scaffold,))

This error may be avoided by creating the lambda in a standalone statement.

To silence this warning, decorate the function with @tf.autograph.experimental.do_not_convert
WARNING: AutoGraph could not transform <function _combine_distributed_scaffold.<locals>.<lambda> at 0x7ff2b01248c8> and will run it as-is.
Cause: could not parse the source code:

      lambda scaffold: scaffold.ready_op, args=(grouped_scaffold,))

This error may be avoided by creating the lambda in a standalone statement.

To silence this warning, decorate the function with @tf.autograph.experimental.do_not_convert
INFO:tensorflow:Create CheckpointSaverHook.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow_estimator/python/estimator/util.py:96: DistributedIteratorV1.initialize (from tensorflow.python.distribute.input_lib) is deprecated and will be removed in a future version.
Instructions for updating:
Use the iterator's `initializer` property instead.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...
INFO:tensorflow:Saving checkpoints for 0 into /tmp/tmpv0t1p98n/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...
INFO:tensorflow:loss = 1.0, step = 0
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 10...
INFO:tensorflow:Saving checkpoints for 10 into /tmp/tmpv0t1p98n/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 10...
INFO:tensorflow:Loss for final step: 2.877698e-13.
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
WARNING:tensorflow:AutoGraph could not transform <function _combine_distributed_scaffold.<locals>.<lambda> at 0x7ff2b017c0d0> and will run it as-is.
Cause: could not parse the source code:

      lambda scaffold: scaffold.ready_op, args=(grouped_scaffold,))

This error may be avoided by creating the lambda in a standalone statement.

To silence this warning, decorate the function with @tf.autograph.experimental.do_not_convert
WARNING: AutoGraph could not transform <function _combine_distributed_scaffold.<locals>.<lambda> at 0x7ff2b017c0d0> and will run it as-is.
Cause: could not parse the source code:

      lambda scaffold: scaffold.ready_op, args=(grouped_scaffold,))

This error may be avoided by creating the lambda in a standalone statement.

To silence this warning, decorate the function with @tf.autograph.experimental.do_not_convert
INFO:tensorflow:Starting evaluation at 2020-09-22T19:10:02Z
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Restoring parameters from /tmp/tmpv0t1p98n/model.ckpt-10
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Evaluation [1/10]
INFO:tensorflow:Evaluation [2/10]
INFO:tensorflow:Evaluation [3/10]
INFO:tensorflow:Evaluation [4/10]
INFO:tensorflow:Evaluation [5/10]
INFO:tensorflow:Evaluation [6/10]
INFO:tensorflow:Evaluation [7/10]
INFO:tensorflow:Evaluation [8/10]
INFO:tensorflow:Evaluation [9/10]
INFO:tensorflow:Evaluation [10/10]
INFO:tensorflow:Inference Time : 0.24472s
INFO:tensorflow:Finished evaluation at 2020-09-22-19:10:02
INFO:tensorflow:Saving dict for global step 10: average_loss = 1.4210855e-14, global_step = 10, label/mean = 1.0, loss = 1.4210855e-14, prediction/mean = 0.99999994
INFO:tensorflow:Saving 'checkpoint_path' summary for global step 10: /tmp/tmpv0t1p98n/model.ckpt-10

{'average_loss': 1.4210855e-14,
 'label/mean': 1.0,
 'loss': 1.4210855e-14,
 'prediction/mean': 0.99999994,
 'global_step': 10}

需要在这里强调的 Estimator 和 Keras 的另一个区别是输入处理。在 Keras 中,我们提到过数据集的每个批次都会在多个副本之间自动拆分。但在 Estimator 中,批次不会自动拆分,也不会在不同的工作进程之间对数据进行分片处理。您可以完全控制数据在工作进程和设备之间的分布方式,而且您必须提供 input_fn 来指定数据的分布方式。

每个工作进程都会调用一次 input_fn,从而为每个工作进程提供一个数据集。然后数据集中的一个批次会被馈送到此工作进程上的一个副本,因此,1 个工作进程上的 N 个副本要使用 N 个批次。换句话说,input_fn 返回的数据集应提供大小为 PER_REPLICA_BATCH_SIZE 的批次。步骤的全局批次大小可通过 PER_REPLICA_BATCH_SIZE * strategy.num_replicas_in_sync 获得。

在进行多工作进程训练时,您应该将数据拆分至各个工作进程,或者在每个工作进程上打乱随机种子的顺序。您可以在使用 Estimator 进行多工作进程训练中参阅有关此操作的示例。

同样,您也可以使用多工作进程和参数服务器策略。代码保持不变,但需要使用 tf.estimator.train_and_evaluate,并为集群中运行的每个二进制文件设置 TF_CONFIG 环境变量。

目前支持的策略

TPUStrategy 外,所有策略都对使用 Estimator 的训练提供有限支持。基本训练和评估应该可以正常运行,但如基架之类的许多高级功能尚不可用。此集成中可能还存在许多错误。目前,我们不打算主动改进此支持,而是专注于对 Keras 和自定义训练循环的支持。如果可能,您应该会更喜欢在这些 API 中使用 tf.distribute

训练 API MirroredStrategy TPUStrategy MultiWorkerMirroredStrategy CentralStorageStrategy ParameterServerStrategy
Estimator API 有限支持 不支持 有限支持 有限支持 有限支持

示例和教程

下列示例展示了 Estimator 中各种策略的完整用法:

  1. 使用 MultiWorkerMirroredStrategy 通过多个工作进程训练 MNIST 的使用 Estimator 进行多工作进程训练
  2. 使用 Kubernetes 模板在 tensorflow/ecosystem 中进行多工作进程训练的完整示例。本示例从 Keras 模型开始,然后使用 tf.keras.estimator.model_to_estimator API 将其转换为 Estimator。
  3. 可以使用 MirroredStrategyMultiWorkerMirroredStrategy 进行训练的官方 ResNet50 模型。

其他主题

在此部分,我们将介绍与多个用例相关的主题。

设置 TF_CONFIG 环境变量

对于多工作进程训练来说,如前所述,您需要为每个在集群中运行的二进制文件设置 TF_CONFIG 环境变量。TF_CONFIG 环境变量是一个 JSON 字符串,它指定了构成集群的任务、它们的地址,以及每个任务在集群中的角色。我们在 tensorflow/ecosystem 仓库中提供了一个 Kubernetes 模板,可为您的训练任务设置 TF_CONFIG

TF_CONFIG 有两个组件:cluster 和 task。cluster 会提供有关训练集群的信息,这是一个由不同类型的作业(如工作进程)组成的字典。在多工作进程训练中,通常会有一个工作进程除了要完成常规工作进程的工作之外,还要承担更多责任,如保存检查点和为 TensorBoard 编写摘要文件。此类工作进程称为 "chief" 工作进程,习惯上会将索引为 0 的工作进程指定为 chief 工作进程(实际上这是 tf.distribute.Strategy 的实现方式)。另一方面,task 会提供有关当前任务的信息。第一个组件 cluster 对于所有工作进程都相同,而第二个组件 task 在每个工作进程上均不相同,并指定了该工作进程的类型和索引。

TF_CONFIG 的示例如下:

os.environ["TF_CONFIG"] = json.dumps({     "cluster": {         "worker": ["host1:port", "host2:port", "host3:port"],         "ps": ["host4:port", "host5:port"]     },    "task": {"type": "worker", "index": 1} })

TF_CONFIG 指定了集群中包含三个工作进程和两个 ps 任务,以及它们的主机和端口。"task" 部分指定当前任务在集群中的角色,即 worker 1(第二个工作进程)。集群中的有效角色是 "chief"、"worker"、"ps" 和 "evaluator"。除使用 tf.distribute.experimental.ParameterServerStrategy 时外,不应有 "ps" 作业。

后续计划

我们正在积极开发 tf.distribute.Strategy。欢迎您试用,并通过 GitHub 问题提供反馈。