Help protect the Great Barrier Reef with TensorFlow on Kaggle Join Challenge

Use TPUs

View on TensorFlow.org Run in Google Colab View source on GitHub Download notebook

Before you run this Colab notebook, make sure that your hardware accelerator is a TPU by checking your notebook settings: Runtime > Change runtime type > Hardware accelerator > TPU.

Setup

import tensorflow as tf

import os
import tensorflow_datasets as tfds
/tmpfs/src/tf_docs_env/lib/python3.6/site-packages/requests/__init__.py:104: RequestsDependencyWarning: urllib3 (1.26.7) or chardet (2.3.0)/charset_normalizer (2.0.8) doesn't match a supported version!
  RequestsDependencyWarning)

TPU initialization

TPUs are typically Cloud TPU workers, which are different from the local process running the user's Python program. Thus, you need to do some initialization work to connect to the remote cluster and initialize the TPUs. Note that the tpu argument to tf.distribute.cluster_resolver.TPUClusterResolver is a special address just for Colab. If you are running your code on Google Compute Engine (GCE), you should instead pass in the name of your Cloud TPU.

resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu='')
tf.config.experimental_connect_to_cluster(resolver)
# This is the TPU initialization code that has to be at the beginning.
tf.tpu.experimental.initialize_tpu_system(resolver)
print("All devices: ", tf.config.list_logical_devices('TPU'))
INFO:tensorflow:Clearing out eager caches
INFO:tensorflow:Clearing out eager caches
INFO:tensorflow:Initializing the TPU system: grpc://10.240.1.2:8470
INFO:tensorflow:Initializing the TPU system: grpc://10.240.1.2:8470
INFO:tensorflow:Finished initializing TPU system.
INFO:tensorflow:Finished initializing TPU system.
All devices:  [LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:0', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:1', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:2', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:3', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:4', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:5', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:6', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:7', device_type='TPU')]

Manual device placement

After the TPU is initialized, you can use manual device placement to place the computation on a single TPU device:

a = tf.constant([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]])
b = tf.constant([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]])

with tf.device('/TPU:0'):
  c = tf.matmul(a, b)

print("c device: ", c.device)
print(c)
c device:  /job:worker/replica:0/task:0/device:TPU:0
tf.Tensor(
[[22. 28.]
 [49. 64.]], shape=(2, 2), dtype=float32)

Distribution strategies

Usually you run your model on multiple TPUs in a data-parallel way. To distribute your model on multiple TPUs (or other accelerators), TensorFlow offers several distribution strategies. You can replace your distribution strategy and the model will run on any given (TPU) device. Check the distribution strategy guide for more information.

To demonstrate this, create a tf.distribute.TPUStrategy object:

strategy = tf.distribute.TPUStrategy(resolver)
INFO:tensorflow:Found TPU system:
INFO:tensorflow:Found TPU system:
INFO:tensorflow:*** Num TPU Cores: 8
INFO:tensorflow:*** Num TPU Cores: 8
INFO:tensorflow:*** Num TPU Workers: 1
INFO:tensorflow:*** Num TPU Workers: 1
INFO:tensorflow:*** Num TPU Cores Per Worker: 8
INFO:tensorflow:*** Num TPU Cores Per Worker: 8
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:localhost/replica:0/task:0/device:CPU:0, CPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:localhost/replica:0/task:0/device:CPU:0, CPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:CPU:0, CPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:CPU:0, CPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:0, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:0, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:1, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:1, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:2, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:2, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:3, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:3, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:4, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:4, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:5, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:5, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:6, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:6, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:7, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:7, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU_SYSTEM:0, TPU_SYSTEM, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU_SYSTEM:0, TPU_SYSTEM, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:XLA_CPU:0, XLA_CPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:XLA_CPU:0, XLA_CPU, 0, 0)

To replicate a computation so it can run in all TPU cores, you can pass it into the strategy.run API. Below is an example that shows all cores receiving the same inputs (a, b) and performing matrix multiplication on each core independently. The outputs will be the values from all the replicas.

@tf.function
def matmul_fn(x, y):
  z = tf.matmul(x, y)
  return z

z = strategy.run(matmul_fn, args=(a, b))
print(z)
PerReplica:{
  0: tf.Tensor(
[[22. 28.]
 [49. 64.]], shape=(2, 2), dtype=float32),
  1: tf.Tensor(
[[22. 28.]
 [49. 64.]], shape=(2, 2), dtype=float32),
  2: tf.Tensor(
[[22. 28.]
 [49. 64.]], shape=(2, 2), dtype=float32),
  3: tf.Tensor(
[[22. 28.]
 [49. 64.]], shape=(2, 2), dtype=float32),
  4: tf.Tensor(
[[22. 28.]
 [49. 64.]], shape=(2, 2), dtype=float32),
  5: tf.Tensor(
[[22. 28.]
 [49. 64.]], shape=(2, 2), dtype=float32),
  6: tf.Tensor(
[[22. 28.]
 [49. 64.]], shape=(2, 2), dtype=float32),
  7: tf.Tensor(
[[22. 28.]
 [49. 64.]], shape=(2, 2), dtype=float32)
}

Classification on TPUs

Having covered the basic concepts, consider a more concrete example. This section demonstrates how to use the distribution strategy—tf.distribute.TPUStrategy—to train a Keras model on a Cloud TPU.

Define a Keras model

Start with a definition of a Sequential Keras model for image classification on the MNIST dataset using Keras. It's no different than what you would use if you were training on CPUs or GPUs. Note that Keras model creation needs to be inside strategy.scope, so the variables can be created on each TPU device. Other parts of the code are not necessary to be inside the strategy scope.

def create_model():
  return tf.keras.Sequential(
      [tf.keras.layers.Conv2D(256, 3, activation='relu', input_shape=(28, 28, 1)),
       tf.keras.layers.Conv2D(256, 3, activation='relu'),
       tf.keras.layers.Flatten(),
       tf.keras.layers.Dense(256, activation='relu'),
       tf.keras.layers.Dense(128, activation='relu'),
       tf.keras.layers.Dense(10)])

Load the dataset

Efficient use of the tf.data.Dataset API is critical when using a Cloud TPU, as it is impossible to use the Cloud TPUs unless you can feed them data quickly enough. You can learn more about dataset performance in the Input pipeline performance guide.

For all but the simplest experiments (using tf.data.Dataset.from_tensor_slices or other in-graph data), you need to store all data files read by the Dataset in Google Cloud Storage (GCS) buckets.

For most use cases, it is recommended to convert your data into the TFRecord format and use a tf.data.TFRecordDataset to read it. Check the TFRecord and tf.Example tutorial for details on how to do this. It is not a hard requirement and you can use other dataset readers, such as tf.data.FixedLengthRecordDataset or tf.data.TextLineDataset.

You can load entire small datasets into memory using tf.data.Dataset.cache.

Regardless of the data format used, it is strongly recommended that you use large files on the order of 100MB. This is especially important in this networked setting, as the overhead of opening a file is significantly higher.

As shown in the code below, you should use the tensorflow_datasets module to get a copy of the MNIST training and test data. Note that try_gcs is specified to use a copy that is available in a public GCS bucket. If you don't specify this, the TPU will not be able to access the downloaded data.

def get_dataset(batch_size, is_training=True):
  split = 'train' if is_training else 'test'
  dataset, info = tfds.load(name='mnist', split=split, with_info=True,
                            as_supervised=True, try_gcs=True)

  # Normalize the input data.
  def scale(image, label):
    image = tf.cast(image, tf.float32)
    image /= 255.0
    return image, label

  dataset = dataset.map(scale)

  # Only shuffle and repeat the dataset in training. The advantage of having an
  # infinite dataset for training is to avoid the potential last partial batch
  # in each epoch, so that you don't need to think about scaling the gradients
  # based on the actual batch size.
  if is_training:
    dataset = dataset.shuffle(10000)
    dataset = dataset.repeat()

  dataset = dataset.batch(batch_size)

  return dataset

Train the model using Keras high-level APIs

You can train your model with Keras fit and compile APIs. There is nothing TPU-specific in this step—you write the code as if you were using mutliple GPUs and a MirroredStrategy instead of the TPUStrategy. You can learn more in the Distributed training with Keras tutorial.

with strategy.scope():
  model = create_model()
  model.compile(optimizer='adam',
                loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                metrics=['sparse_categorical_accuracy'])

batch_size = 200
steps_per_epoch = 60000 // batch_size
validation_steps = 10000 // batch_size

train_dataset = get_dataset(batch_size, is_training=True)
test_dataset = get_dataset(batch_size, is_training=False)

model.fit(train_dataset,
          epochs=5,
          steps_per_epoch=steps_per_epoch,
          validation_data=test_dataset, 
          validation_steps=validation_steps)
Epoch 1/5
300/300 [==============================] - 18s 32ms/step - loss: 0.1291 - sparse_categorical_accuracy: 0.9590 - val_loss: 0.0410 - val_sparse_categorical_accuracy: 0.9871
Epoch 2/5
300/300 [==============================] - 6s 22ms/step - loss: 0.0335 - sparse_categorical_accuracy: 0.9892 - val_loss: 0.0492 - val_sparse_categorical_accuracy: 0.9847
Epoch 3/5
300/300 [==============================] - 6s 21ms/step - loss: 0.0205 - sparse_categorical_accuracy: 0.9941 - val_loss: 0.0457 - val_sparse_categorical_accuracy: 0.9868
Epoch 4/5
300/300 [==============================] - 7s 22ms/step - loss: 0.0115 - sparse_categorical_accuracy: 0.9960 - val_loss: 0.0428 - val_sparse_categorical_accuracy: 0.9891
Epoch 5/5
300/300 [==============================] - 6s 21ms/step - loss: 0.0110 - sparse_categorical_accuracy: 0.9965 - val_loss: 0.0592 - val_sparse_categorical_accuracy: 0.9853
<keras.callbacks.History at 0x7f25fc108a58>

To reduce Python overhead and maximize the performance of your TPU, pass in the argument—steps_per_execution—to Model.compile. In this example, it increases throughput by about 50%:

with strategy.scope():
  model = create_model()
  model.compile(optimizer='adam',
                # Anything between 2 and `steps_per_epoch` could help here.
                steps_per_execution = 50,
                loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                metrics=['sparse_categorical_accuracy'])

model.fit(train_dataset,
          epochs=5,
          steps_per_epoch=steps_per_epoch,
          validation_data=test_dataset,
          validation_steps=validation_steps)
Epoch 1/5
300/300 [==============================] - 13s 42ms/step - loss: 0.1343 - sparse_categorical_accuracy: 0.9594 - val_loss: 0.0487 - val_sparse_categorical_accuracy: 0.9837
Epoch 2/5
300/300 [==============================] - 3s 10ms/step - loss: 0.0348 - sparse_categorical_accuracy: 0.9891 - val_loss: 0.0401 - val_sparse_categorical_accuracy: 0.9876
Epoch 3/5
300/300 [==============================] - 3s 10ms/step - loss: 0.0187 - sparse_categorical_accuracy: 0.9940 - val_loss: 0.0517 - val_sparse_categorical_accuracy: 0.9844
Epoch 4/5
300/300 [==============================] - 3s 10ms/step - loss: 0.0123 - sparse_categorical_accuracy: 0.9963 - val_loss: 0.0506 - val_sparse_categorical_accuracy: 0.9860
Epoch 5/5
300/300 [==============================] - 3s 10ms/step - loss: 0.0080 - sparse_categorical_accuracy: 0.9974 - val_loss: 0.0456 - val_sparse_categorical_accuracy: 0.9878
<keras.callbacks.History at 0x7f23ac7782b0>

Train the model using a custom training loop

You can also create and train your model using tf.function and tf.distribute APIs directly. You can use the strategy.experimental_distribute_datasets_from_function API to distribute the dataset given a dataset function. Note that in the example below the batch size passed into the dataset is the per-replica batch size instead of the global batch size. To learn more, check out the Custom training with tf.distribute.Strategy tutorial.

First, create the model, datasets and tf.functions:

# Create the model, optimizer and metrics inside the strategy scope, so that the
# variables can be mirrored on each device.
with strategy.scope():
  model = create_model()
  optimizer = tf.keras.optimizers.Adam()
  training_loss = tf.keras.metrics.Mean('training_loss', dtype=tf.float32)
  training_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      'training_accuracy', dtype=tf.float32)

# Calculate per replica batch size, and distribute the datasets on each TPU
# worker.
per_replica_batch_size = batch_size // strategy.num_replicas_in_sync

train_dataset = strategy.experimental_distribute_datasets_from_function(
    lambda _: get_dataset(per_replica_batch_size, is_training=True))

@tf.function
def train_step(iterator):
  """The step function for one training step."""

  def step_fn(inputs):
    """The computation to run on each TPU device."""
    images, labels = inputs
    with tf.GradientTape() as tape:
      logits = model(images, training=True)
      loss = tf.keras.losses.sparse_categorical_crossentropy(
          labels, logits, from_logits=True)
      loss = tf.nn.compute_average_loss(loss, global_batch_size=batch_size)
    grads = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(list(zip(grads, model.trainable_variables)))
    training_loss.update_state(loss * strategy.num_replicas_in_sync)
    training_accuracy.update_state(labels, logits)

  strategy.run(step_fn, args=(next(iterator),))
WARNING:tensorflow:From <ipython-input-1-5625c2a14441>:15: StrategyBase.experimental_distribute_datasets_from_function (from tensorflow.python.distribute.distribute_lib) is deprecated and will be removed in a future version.
Instructions for updating:
rename to distribute_datasets_from_function
WARNING:tensorflow:From <ipython-input-1-5625c2a14441>:15: StrategyBase.experimental_distribute_datasets_from_function (from tensorflow.python.distribute.distribute_lib) is deprecated and will be removed in a future version.
Instructions for updating:
rename to distribute_datasets_from_function

Then, run the training loop:

steps_per_eval = 10000 // batch_size

train_iterator = iter(train_dataset)
for epoch in range(5):
  print('Epoch: {}/5'.format(epoch))

  for step in range(steps_per_epoch):
    train_step(train_iterator)
  print('Current step: {}, training loss: {}, accuracy: {}%'.format(
      optimizer.iterations.numpy(),
      round(float(training_loss.result()), 4),
      round(float(training_accuracy.result()) * 100, 2)))
  training_loss.reset_states()
  training_accuracy.reset_states()
Epoch: 0/5
Current step: 300, training loss: 0.1436, accuracy: 95.42%
Epoch: 1/5
Current step: 600, training loss: 0.0342, accuracy: 98.94%
Epoch: 2/5
Current step: 900, training loss: 0.0205, accuracy: 99.36%
Epoch: 3/5
Current step: 1200, training loss: 0.0123, accuracy: 99.59%
Epoch: 4/5
Current step: 1500, training loss: 0.0089, accuracy: 99.68%

Improving performance with multiple steps inside tf.function

You can improve the performance by running multiple steps within a tf.function. This is achieved by wrapping the strategy.run call with a tf.range inside tf.function, and AutoGraph will convert it to a tf.while_loop on the TPU worker.

Despite the improved performance, there are tradeoffs with this method compared to running a single step inside tf.function. Running multiple steps in a tf.function is less flexible—you cannot run things eagerly or arbitrary Python code within the steps.

@tf.function
def train_multiple_steps(iterator, steps):
  """The step function for one training step."""

  def step_fn(inputs):
    """The computation to run on each TPU device."""
    images, labels = inputs
    with tf.GradientTape() as tape:
      logits = model(images, training=True)
      loss = tf.keras.losses.sparse_categorical_crossentropy(
          labels, logits, from_logits=True)
      loss = tf.nn.compute_average_loss(loss, global_batch_size=batch_size)
    grads = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(list(zip(grads, model.trainable_variables)))
    training_loss.update_state(loss * strategy.num_replicas_in_sync)
    training_accuracy.update_state(labels, logits)

  for _ in tf.range(steps):
    strategy.run(step_fn, args=(next(iterator),))

# Convert `steps_per_epoch` to `tf.Tensor` so the `tf.function` won't get 
# retraced if the value changes.
train_multiple_steps(train_iterator, tf.convert_to_tensor(steps_per_epoch))

print('Current step: {}, training loss: {}, accuracy: {}%'.format(
      optimizer.iterations.numpy(),
      round(float(training_loss.result()), 4),
      round(float(training_accuracy.result()) * 100, 2)))
Current step: 1800, training loss: 0.0093, accuracy: 99.68%

Next steps