此页面由 Cloud Translation API 翻译。
Switch to English

使用TPU

在TensorFlow.org上查看 在Google Colab中运行 在GitHub上查看源代码 下载笔记本

目前,Keras和Google Colab可提供对Cloud TPU的实验支持。在运行此Colab笔记本之前,请检查笔记本设置,确保您的硬件加速器是TPU:运行时>更改运行时类型>硬件加速器> TPU。

建立

import tensorflow as tf

import os
import tensorflow_datasets as tfds

TPU初始化

TPU通常位于Cloud TPU工作者上,这与运行用户python程序的本地进程不同。因此,需要完成一些初始化工作才能连接到远程集群并初始化TPU。请注意, TPUClusterResolvertpu参数是仅用于Colab的特殊地址。如果您在Google Compute Engine(GCE)上运行,则应传递CloudTPU的名称。

resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu='grpc://' + os.environ['COLAB_TPU_ADDR'])
tf.config.experimental_connect_to_cluster(resolver)
# This is the TPU initialization code that has to be at the beginning.
tf.tpu.experimental.initialize_tpu_system(resolver)
print("All devices: ", tf.config.list_logical_devices('TPU'))
INFO:tensorflow:Initializing the TPU system: grpc://10.240.1.2:8470

INFO:tensorflow:Initializing the TPU system: grpc://10.240.1.2:8470

INFO:tensorflow:Clearing out eager caches

INFO:tensorflow:Clearing out eager caches

INFO:tensorflow:Finished initializing TPU system.

INFO:tensorflow:Finished initializing TPU system.

All devices:  [LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:7', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:6', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:5', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:4', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:0', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:1', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:2', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:3', device_type='TPU')]

手动放置设备

初始化TPU后,您可以使用手动设备放置将计算放置在单个TPU设备上。

a = tf.constant([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]])
b = tf.constant([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]])
with tf.device('/TPU:0'):
  c = tf.matmul(a, b)
print("c device: ", c.device)
print(c)
c device:  /job:worker/replica:0/task:0/device:TPU:0
tf.Tensor(
[[22. 28.]
 [49. 64.]], shape=(2, 2), dtype=float32)

分销策略

大多数情况下,用户希望以数据并行方式在多个TPU上运行模型。分发策略是一种抽象,可用于在CPU,GPU或TPU上驱动模型。只需换出分发策略,模型即可在给定设备上运行。有关更多信息,请参见分发策略指南

首先,创建TPUStrategy对象。

strategy = tf.distribute.TPUStrategy(resolver)
INFO:tensorflow:Found TPU system:

INFO:tensorflow:Found TPU system:

INFO:tensorflow:*** Num TPU Cores: 8

INFO:tensorflow:*** Num TPU Cores: 8

INFO:tensorflow:*** Num TPU Workers: 1

INFO:tensorflow:*** Num TPU Workers: 1

INFO:tensorflow:*** Num TPU Cores Per Worker: 8

INFO:tensorflow:*** Num TPU Cores Per Worker: 8

INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:localhost/replica:0/task:0/device:CPU:0, CPU, 0, 0)

INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:localhost/replica:0/task:0/device:CPU:0, CPU, 0, 0)

INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:localhost/replica:0/task:0/device:XLA_CPU:0, XLA_CPU, 0, 0)

INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:localhost/replica:0/task:0/device:XLA_CPU:0, XLA_CPU, 0, 0)

INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:CPU:0, CPU, 0, 0)

INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:CPU:0, CPU, 0, 0)

INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:0, TPU, 0, 0)

INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:0, TPU, 0, 0)

INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:1, TPU, 0, 0)

INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:1, TPU, 0, 0)

INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:2, TPU, 0, 0)

INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:2, TPU, 0, 0)

INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:3, TPU, 0, 0)

INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:3, TPU, 0, 0)

INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:4, TPU, 0, 0)

INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:4, TPU, 0, 0)

INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:5, TPU, 0, 0)

INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:5, TPU, 0, 0)

INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:6, TPU, 0, 0)

INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:6, TPU, 0, 0)

INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:7, TPU, 0, 0)

INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:7, TPU, 0, 0)

INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU_SYSTEM:0, TPU_SYSTEM, 0, 0)

INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU_SYSTEM:0, TPU_SYSTEM, 0, 0)

INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:XLA_CPU:0, XLA_CPU, 0, 0)

INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:XLA_CPU:0, XLA_CPU, 0, 0)

要复制计算以使其可以在所有TPU内核中运行,只需将其传递给strategy.run API。下面是一个示例,所有内核将获得相同的输入(a, b) ,并在每个内核上独立执行matmul。输出将是来自所有副本的值。

@tf.function
def matmul_fn(x, y):
  z = tf.matmul(x, y)
  return z

z = strategy.run(matmul_fn, args=(a, b))
print(z)
PerReplica:{
  0: tf.Tensor(
[[22. 28.]
 [49. 64.]], shape=(2, 2), dtype=float32),
  1: tf.Tensor(
[[22. 28.]
 [49. 64.]], shape=(2, 2), dtype=float32),
  2: tf.Tensor(
[[22. 28.]
 [49. 64.]], shape=(2, 2), dtype=float32),
  3: tf.Tensor(
[[22. 28.]
 [49. 64.]], shape=(2, 2), dtype=float32),
  4: tf.Tensor(
[[22. 28.]
 [49. 64.]], shape=(2, 2), dtype=float32),
  5: tf.Tensor(
[[22. 28.]
 [49. 64.]], shape=(2, 2), dtype=float32),
  6: tf.Tensor(
[[22. 28.]
 [49. 64.]], shape=(2, 2), dtype=float32),
  7: tf.Tensor(
[[22. 28.]
 [49. 64.]], shape=(2, 2), dtype=float32)
}

TPU分类

当我们了解了基本概念后,就该看一个更具体的例子了。本指南演示了如何使用分发策略tf.distribute.experimental.TPUStrategy来驱动Cloud TPU和训练Keras模型。

定义Keras模型

以下是使用Keras的MNIST模型的定义,与您在CPU或GPU上使用的定义相同。请注意,Keras模型的创建需要在strategy.scope ,因此可以在每个TPU设备上创建变量。代码的其他部分不必在策略范围内。

def create_model():
  return tf.keras.Sequential(
      [tf.keras.layers.Conv2D(256, 3, activation='relu', input_shape=(28, 28, 1)),
       tf.keras.layers.Conv2D(256, 3, activation='relu'),
       tf.keras.layers.Flatten(),
       tf.keras.layers.Dense(256, activation='relu'),
       tf.keras.layers.Dense(128, activation='relu'),
       tf.keras.layers.Dense(10)])

输入数据集

在使用Cloud TPU时,高效使用tf.data.Dataset API至关重要,因为除非您可以足够快地喂入它们,否则无法使用Cloud TPU。有关数据集性能的详细信息,请参见《 输入管道性能指南》

对于除最简单的实验以外的所有实验(使用tf.data.Dataset.from_tensor_slices或其他图形数据),您都需要将数据集读取的所有数据文件存储在Google Cloud Storage(GCS)存储桶中。

对于大多数用例,建议将数据转换为TFRecord格式,并使用tf.data.TFRecordDataset进行读取。有关如何执行此操作的详细信息,请参见TFRecord和tf.Example教程 。但是,这并不是硬性要求,如果您愿意,可以使用其他数据集读取器( FixedLengthRecordDatasetTextLineDataset )。

小型数据集可以使用tf.data.Dataset.cache完全加载到内存中。

无论使用哪种数据格式,强烈建议您使用100MB左右的大文件。这在此网络设置中尤其重要,因为打开文件的开销明显更高。

在这里,您应该使用tensorflow_datasets模块来获取MNIST训练数据的副本。请注意,指定try_gcs以使用公共GCS存储桶中可用的副本。如果未指定,则TPU将无法访问已下载的数据。

def get_dataset(batch_size, is_training=True):
  split = 'train' if is_training else 'test'
  dataset, info = tfds.load(name='mnist', split=split, with_info=True,
                            as_supervised=True, try_gcs=True)

  def scale(image, label):
    image = tf.cast(image, tf.float32)
    image /= 255.0

    return image, label

  dataset = dataset.map(scale)

  # Only shuffle and repeat the dataset in training. The advantage to have a
  # infinite dataset for training is to avoid the potential last partial batch
  # in each epoch, so users don't need to think about scaling the gradients
  # based on the actual batch size.
  if is_training:
    dataset = dataset.shuffle(10000)
    dataset = dataset.repeat()

  dataset = dataset.batch(batch_size)

  return dataset

使用Keras高级API训练模型

您可以简单地使用Keras拟合/编译API训练模型。这里没有什么是TPU专用的,如果您有多个GPU,并且使用MirroredStrategy而不是TPUStrategy ,则您将在下面编写相同的代码。要了解更多信息,请查看使用Keras进行的分布式培训教程。

with strategy.scope():
  model = create_model()
  model.compile(optimizer='adam',
                loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                metrics=['sparse_categorical_accuracy'])

batch_size = 200
steps_per_epoch = 60000 // batch_size
validation_steps = 10000 // batch_size

train_dataset = get_dataset(batch_size, is_training=True)
test_dataset = get_dataset(batch_size, is_training=False)

model.fit(train_dataset,
          epochs=5,
          steps_per_epoch=steps_per_epoch,
          validation_data=test_dataset, 
          validation_steps=validation_steps)
Epoch 1/5
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/data/ops/multi_device_iterator_ops.py:601: get_next_as_optional (from tensorflow.python.data.ops.iterator_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.data.Iterator.get_next_as_optional()` instead.

Warning:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/data/ops/multi_device_iterator_ops.py:601: get_next_as_optional (from tensorflow.python.data.ops.iterator_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.data.Iterator.get_next_as_optional()` instead.

  1/300 [..............................] - ETA: 20:25 - loss: 2.3062 - sparse_categorical_accuracy: 0.0500WARNING:tensorflow:Callbacks method `on_train_batch_end` is slow compared to the batch time (batch time: 0.0015s vs `on_train_batch_end` time: 0.0237s). Check your callbacks.

Warning:tensorflow:Callbacks method `on_train_batch_end` is slow compared to the batch time (batch time: 0.0015s vs `on_train_batch_end` time: 0.0237s). Check your callbacks.

300/300 [==============================] - ETA: 0s - loss: 0.1450 - sparse_categorical_accuracy: 0.9561WARNING:tensorflow:Callbacks method `on_test_batch_end` is slow compared to the batch time (batch time: 0.0013s vs `on_test_batch_end` time: 0.0097s). Check your callbacks.

Warning:tensorflow:Callbacks method `on_test_batch_end` is slow compared to the batch time (batch time: 0.0013s vs `on_test_batch_end` time: 0.0097s). Check your callbacks.

300/300 [==============================] - 15s 50ms/step - loss: 0.1450 - sparse_categorical_accuracy: 0.9561 - val_loss: 0.0456 - val_sparse_categorical_accuracy: 0.9852
Epoch 2/5
300/300 [==============================] - 8s 28ms/step - loss: 0.0357 - sparse_categorical_accuracy: 0.9890 - val_loss: 0.0372 - val_sparse_categorical_accuracy: 0.9884
Epoch 3/5
300/300 [==============================] - 9s 28ms/step - loss: 0.0193 - sparse_categorical_accuracy: 0.9940 - val_loss: 0.0557 - val_sparse_categorical_accuracy: 0.9835
Epoch 4/5
300/300 [==============================] - 9s 29ms/step - loss: 0.0141 - sparse_categorical_accuracy: 0.9954 - val_loss: 0.0405 - val_sparse_categorical_accuracy: 0.9883
Epoch 5/5
300/300 [==============================] - 9s 29ms/step - loss: 0.0092 - sparse_categorical_accuracy: 0.9967 - val_loss: 0.0428 - val_sparse_categorical_accuracy: 0.9887

<tensorflow.python.keras.callbacks.History at 0x7f480c793240>

为了减少python的开销并最大化TPU的性能,请尝试使用Model.compile实验 experimental_steps_per_execution Model.compile参数。在这里,它使吞吐量提高了约50%:

with strategy.scope():
  model = create_model()
  model.compile(optimizer='adam',
                # Anything between 2 and `steps_per_epoch` could help here.
                experimental_steps_per_execution = 50,
                loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                metrics=['sparse_categorical_accuracy'])

model.fit(train_dataset,
          epochs=5,
          steps_per_epoch=steps_per_epoch,
          validation_data=test_dataset,
          validation_steps=validation_steps)
Epoch 1/5
300/300 [==============================] - 16s 52ms/step - loss: 0.1337 - sparse_categorical_accuracy: 0.9583 - val_loss: 0.0521 - val_sparse_categorical_accuracy: 0.9840
Epoch 2/5
300/300 [==============================] - 5s 16ms/step - loss: 0.0331 - sparse_categorical_accuracy: 0.9898 - val_loss: 0.0360 - val_sparse_categorical_accuracy: 0.9884
Epoch 3/5
300/300 [==============================] - 5s 16ms/step - loss: 0.0188 - sparse_categorical_accuracy: 0.9939 - val_loss: 0.0405 - val_sparse_categorical_accuracy: 0.9887
Epoch 4/5
300/300 [==============================] - 5s 16ms/step - loss: 0.0117 - sparse_categorical_accuracy: 0.9961 - val_loss: 0.0808 - val_sparse_categorical_accuracy: 0.9820
Epoch 5/5
300/300 [==============================] - 5s 16ms/step - loss: 0.0119 - sparse_categorical_accuracy: 0.9962 - val_loss: 0.0488 - val_sparse_categorical_accuracy: 0.9862

<tensorflow.python.keras.callbacks.History at 0x7f47ac791240>

使用自定义训练循环训练模型。

您还可以直接使用tf.functiontf.distribute API创建和训练模型。 strategy.experimental_distribute_datasets_from_function API用于分配给定数据集功能的数据集。请注意,在这种情况下,传递到数据集中的批处理大小将是每个副本的批处理大小,而不是全局批处理大小。要了解更多信息,请查看tf.distribute.Strategy教程中的Custom培训

首先,创建模型,数据集和tf.function。

# Create the model, optimizer and metrics inside strategy scope, so that the
# variables can be mirrored on each device.
with strategy.scope():
  model = create_model()
  optimizer = tf.keras.optimizers.Adam()
  training_loss = tf.keras.metrics.Mean('training_loss', dtype=tf.float32)
  training_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      'training_accuracy', dtype=tf.float32)

# Calculate per replica batch size, and distribute the datasets on each TPU
# worker.
per_replica_batch_size = batch_size // strategy.num_replicas_in_sync

train_dataset = strategy.experimental_distribute_datasets_from_function(
    lambda _: get_dataset(per_replica_batch_size, is_training=True))

@tf.function
def train_step(iterator):
  """The step function for one training step"""

  def step_fn(inputs):
    """The computation to run on each TPU device."""
    images, labels = inputs
    with tf.GradientTape() as tape:
      logits = model(images, training=True)
      loss = tf.keras.losses.sparse_categorical_crossentropy(
          labels, logits, from_logits=True)
      loss = tf.nn.compute_average_loss(loss, global_batch_size=batch_size)
    grads = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(list(zip(grads, model.trainable_variables)))
    training_loss.update_state(loss * strategy.num_replicas_in_sync)
    training_accuracy.update_state(labels, logits)

  strategy.run(step_fn, args=(next(iterator),))

然后运行训练循环。

steps_per_eval = 10000 // batch_size

train_iterator = iter(train_dataset)
for epoch in range(5):
  print('Epoch: {}/5'.format(epoch))

  for step in range(steps_per_epoch):
    train_step(train_iterator)
  print('Current step: {}, training loss: {}, accuracy: {}%'.format(
      optimizer.iterations.numpy(),
      round(float(training_loss.result()), 4),
      round(float(training_accuracy.result()) * 100, 2)))
  training_loss.reset_states()
  training_accuracy.reset_states()
Epoch: 0/5
Current step: 300, training loss: 0.1335, accuracy: 95.8%
Epoch: 1/5
Current step: 600, training loss: 0.0344, accuracy: 98.93%
Epoch: 2/5
Current step: 900, training loss: 0.0196, accuracy: 99.35%
Epoch: 3/5
Current step: 1200, training loss: 0.0119, accuracy: 99.61%
Epoch: 4/5
Current step: 1500, training loss: 0.0102, accuracy: 99.65%

通过tf.function的多个步骤来提高性能

通过在tf.function运行多个步骤可以提高性能。这是通过将strategy.run调用包装在tf.range内的tf.function ,AutoGraph会将其转换为TPU worker上的tf.while_loop

尽管具有更好的性能,但与tf.function内部的单个步骤相比,仍存在一些折衷。在tf.function运行多个步骤的tf.function较差,您不能急于运行某些步骤,也不能在这些步骤中随意运行python代码。

@tf.function
def train_multiple_steps(iterator, steps):
  """The step function for one training step"""

  def step_fn(inputs):
    """The computation to run on each TPU device."""
    images, labels = inputs
    with tf.GradientTape() as tape:
      logits = model(images, training=True)
      loss = tf.keras.losses.sparse_categorical_crossentropy(
          labels, logits, from_logits=True)
      loss = tf.nn.compute_average_loss(loss, global_batch_size=batch_size)
    grads = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(list(zip(grads, model.trainable_variables)))
    training_loss.update_state(loss * strategy.num_replicas_in_sync)
    training_accuracy.update_state(labels, logits)

  for _ in tf.range(steps):
    strategy.run(step_fn, args=(next(iterator),))

# Convert `steps_per_epoch` to `tf.Tensor` so the `tf.function` won't get 
# retraced if the value changes.
train_multiple_steps(train_iterator, tf.convert_to_tensor(steps_per_epoch))

print('Current step: {}, training loss: {}, accuracy: {}%'.format(
      optimizer.iterations.numpy(),
      round(float(training_loss.result()), 4),
      round(float(training_accuracy.result()) * 100, 2)))
Current step: 1800, training loss: 0.0087, accuracy: 99.72%

下一步