![]() | ![]() | ![]() | ![]() |
La prise en charge expérimentale des Cloud TPU est actuellement disponible pour Keras et Google Colab. Avant d'exécuter ce notebook Colab, assurez-vous que votre accélérateur matériel est un TPU en vérifiant les paramètres de votre notebook: Runtime> Changer le type d'exécution> Accélérateur matériel> TPU.
Installer
import tensorflow as tf
import os
import tensorflow_datasets as tfds
Initialisation TPU
Les TPU se trouvent généralement sur des nœuds de calcul Cloud TPU qui sont différents du processus local exécutant le programme Python utilisateur. Ainsi, un travail d'initialisation doit être effectué pour se connecter au cluster distant et initialiser les TPU. Notez que l'argument tpu
de TPUClusterResolver
est une adresse spéciale uniquement pour Colab. Dans le cas où vous utilisez Google Compute Engine (GCE), vous devez plutôt transmettre le nom de votre CloudTPU.
resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu='grpc://' + os.environ['COLAB_TPU_ADDR'])
tf.config.experimental_connect_to_cluster(resolver)
# This is the TPU initialization code that has to be at the beginning.
tf.tpu.experimental.initialize_tpu_system(resolver)
print("All devices: ", tf.config.list_logical_devices('TPU'))
INFO:tensorflow:Initializing the TPU system: grpc://10.240.1.18:8470 INFO:tensorflow:Initializing the TPU system: grpc://10.240.1.18:8470 INFO:tensorflow:Clearing out eager caches INFO:tensorflow:Clearing out eager caches INFO:tensorflow:Finished initializing TPU system. INFO:tensorflow:Finished initializing TPU system. All devices: [LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:7', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:6', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:5', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:4', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:3', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:0', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:1', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:2', device_type='TPU')]
Placement manuel de l'appareil
Une fois le TPU initialisé, vous pouvez utiliser le placement manuel du périphérique pour placer le calcul sur un seul périphérique TPU.
a = tf.constant([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]])
b = tf.constant([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]])
with tf.device('/TPU:0'):
c = tf.matmul(a, b)
print("c device: ", c.device)
print(c)
c device: /job:worker/replica:0/task:0/device:TPU:0 tf.Tensor( [[22. 28.] [49. 64.]], shape=(2, 2), dtype=float32)
Stratégies de distribution
La plupart du temps, les utilisateurs souhaitent exécuter le modèle sur plusieurs TPU de manière parallèle aux données. Une stratégie de distribution est une abstraction qui peut être utilisée pour piloter des modèles sur CPU, GPU ou TPU. Remplacez simplement la stratégie de distribution et le modèle fonctionnera sur l'appareil donné. Consultez le guide de stratégie de distribution pour plus d'informations.
Tout d'abord, crée l'objet TPUStrategy
.
strategy = tf.distribute.TPUStrategy(resolver)
INFO:tensorflow:Found TPU system: INFO:tensorflow:Found TPU system: INFO:tensorflow:*** Num TPU Cores: 8 INFO:tensorflow:*** Num TPU Cores: 8 INFO:tensorflow:*** Num TPU Workers: 1 INFO:tensorflow:*** Num TPU Workers: 1 INFO:tensorflow:*** Num TPU Cores Per Worker: 8 INFO:tensorflow:*** Num TPU Cores Per Worker: 8 INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:localhost/replica:0/task:0/device:CPU:0, CPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:localhost/replica:0/task:0/device:CPU:0, CPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:CPU:0, CPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:CPU:0, CPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:0, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:0, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:1, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:1, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:2, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:2, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:3, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:3, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:4, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:4, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:5, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:5, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:6, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:6, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:7, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:7, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU_SYSTEM:0, TPU_SYSTEM, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU_SYSTEM:0, TPU_SYSTEM, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:XLA_CPU:0, XLA_CPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:XLA_CPU:0, XLA_CPU, 0, 0)
Pour répliquer un calcul afin qu'il puisse s'exécuter dans tous les cœurs TPU, vous pouvez simplement le transmettre à l'API strategy.run
. Voici un exemple que tous les cœurs obtiendront les mêmes entrées (a, b)
et feront le matmul sur chaque cœur indépendamment. Les sorties seront les valeurs de toutes les répliques.
@tf.function
def matmul_fn(x, y):
z = tf.matmul(x, y)
return z
z = strategy.run(matmul_fn, args=(a, b))
print(z)
PerReplica:{ 0: tf.Tensor( [[22. 28.] [49. 64.]], shape=(2, 2), dtype=float32), 1: tf.Tensor( [[22. 28.] [49. 64.]], shape=(2, 2), dtype=float32), 2: tf.Tensor( [[22. 28.] [49. 64.]], shape=(2, 2), dtype=float32), 3: tf.Tensor( [[22. 28.] [49. 64.]], shape=(2, 2), dtype=float32), 4: tf.Tensor( [[22. 28.] [49. 64.]], shape=(2, 2), dtype=float32), 5: tf.Tensor( [[22. 28.] [49. 64.]], shape=(2, 2), dtype=float32), 6: tf.Tensor( [[22. 28.] [49. 64.]], shape=(2, 2), dtype=float32), 7: tf.Tensor( [[22. 28.] [49. 64.]], shape=(2, 2), dtype=float32) }
Classification sur les TPU
Comme nous avons appris les concepts de base, il est temps de regarder un exemple plus concret. Ce guide explique comment utiliser la stratégie de distribution tf.distribute.TPUStrategy
pour piloter un Cloud TPU et entraîner un modèle Keras.
Définir un modèle Keras
Vous trouverez ci-dessous la définition du modèle MNIST utilisant Keras, inchangée par rapport à ce que vous utiliseriez sur le CPU ou le GPU. Notez que la création du modèle Keras doit être à l'intérieur de strategy.scope
, de sorte que les variables peuvent être créées sur chaque périphérique TPU. Les autres parties du code ne sont pas nécessaires pour être dans la portée de la stratégie.
def create_model():
return tf.keras.Sequential(
[tf.keras.layers.Conv2D(256, 3, activation='relu', input_shape=(28, 28, 1)),
tf.keras.layers.Conv2D(256, 3, activation='relu'),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(256, activation='relu'),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(10)])
Ensembles de données d'entrée
L'utilisation efficace de l'APItf.data.Dataset
est essentielle lors de l'utilisation d'un Cloud TPU, car il est impossible d'utiliser les Cloud TPU à moins de pouvoir leur fournir des données assez rapidement. Consultez le Guide des performances du pipeline d'entrée pour plus de détails sur les performances de l'ensemble de données.
Pour toutes les expérimentations, sauf la plus simple (en utilisant tf.data.Dataset.from_tensor_slices
ou d'autres données dans le graphique), vous devrez stocker tous les fichiers de données lus par l'ensemble de données dans des buckets Google Cloud Storage (GCS).
Pour la plupart des cas d'utilisation, il est recommandé de convertir vos données au format TFRecord
et d'utiliser un tf.data.TFRecordDataset
pour les lire. Voir le didacticiel TFRecord et tf.Example pour plus de détails sur la façon de procéder. Cependant, ce n'est pas une exigence FixedLengthRecordDataset
et vous pouvez utiliser d'autres lecteurs de FixedLengthRecordDataset
données ( FixedLengthRecordDataset
ou TextLineDataset
) si vous préférez.
Les petits ensembles de données peuvent être chargés entièrement en mémoire à l'aide de tf.data.Dataset.cache
.
Quel que soit le format de données utilisé, il est fortement recommandé d'utiliser des fichiers volumineux, de l'ordre de 100 Mo. Ceci est particulièrement important dans ce cadre en réseau car la surcharge liée à l'ouverture d'un fichier est considérablement plus élevée.
Ici, vous devez utiliser le module tensorflow_datasets
pour obtenir une copie des données d'entraînement MNIST. Notez que try_gcs
est spécifié pour utiliser une copie disponible dans un bucket GCS public. Si vous ne le spécifiez pas, le TPU ne pourra pas accéder aux données téléchargées.
def get_dataset(batch_size, is_training=True):
split = 'train' if is_training else 'test'
dataset, info = tfds.load(name='mnist', split=split, with_info=True,
as_supervised=True, try_gcs=True)
def scale(image, label):
image = tf.cast(image, tf.float32)
image /= 255.0
return image, label
dataset = dataset.map(scale)
# Only shuffle and repeat the dataset in training. The advantage to have a
# infinite dataset for training is to avoid the potential last partial batch
# in each epoch, so users don't need to think about scaling the gradients
# based on the actual batch size.
if is_training:
dataset = dataset.shuffle(10000)
dataset = dataset.repeat()
dataset = dataset.batch(batch_size)
return dataset
Former un modèle à l'aide des API de haut niveau Keras
Vous pouvez entraîner un modèle simplement avec les API d'ajustement / compilation Keras. Rien ici n'est spécifique à TPU, vous écririez le même code ci-dessous si vous aviez plusieurs GPU et que vous TPUStrategy
une TPUStrategy
MirroredStrategy
plutôt qu'une TPUStrategy
. Pour en savoir plus, consultez le didacticiel Formation distribuée avec Keras .
with strategy.scope():
model = create_model()
model.compile(optimizer='adam',
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=['sparse_categorical_accuracy'])
batch_size = 200
steps_per_epoch = 60000 // batch_size
validation_steps = 10000 // batch_size
train_dataset = get_dataset(batch_size, is_training=True)
test_dataset = get_dataset(batch_size, is_training=False)
model.fit(train_dataset,
epochs=5,
steps_per_epoch=steps_per_epoch,
validation_data=test_dataset,
validation_steps=validation_steps)
Epoch 1/5 300/300 [==============================] - 19s 37ms/step - loss: 0.3028 - sparse_categorical_accuracy: 0.9042 - val_loss: 0.0469 - val_sparse_categorical_accuracy: 0.9850 Epoch 2/5 300/300 [==============================] - 8s 25ms/step - loss: 0.0400 - sparse_categorical_accuracy: 0.9885 - val_loss: 0.0396 - val_sparse_categorical_accuracy: 0.9870 Epoch 3/5 300/300 [==============================] - 8s 25ms/step - loss: 0.0212 - sparse_categorical_accuracy: 0.9931 - val_loss: 0.0373 - val_sparse_categorical_accuracy: 0.9893 Epoch 4/5 300/300 [==============================] - 8s 25ms/step - loss: 0.0159 - sparse_categorical_accuracy: 0.9951 - val_loss: 0.0392 - val_sparse_categorical_accuracy: 0.9875 Epoch 5/5 300/300 [==============================] - 8s 25ms/step - loss: 0.0096 - sparse_categorical_accuracy: 0.9968 - val_loss: 0.0463 - val_sparse_categorical_accuracy: 0.9879 <tensorflow.python.keras.callbacks.History at 0x7f16c5c9e908>
Pour réduire la surcharge de Python et maximiser les performances de votre TPU, essayez l'argument expérimental experimental_steps_per_execution
dans Model.compile
. Ici, il augmente le débit d'environ 50%:
with strategy.scope():
model = create_model()
model.compile(optimizer='adam',
# Anything between 2 and `steps_per_epoch` could help here.
experimental_steps_per_execution = 50,
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=['sparse_categorical_accuracy'])
model.fit(train_dataset,
epochs=5,
steps_per_epoch=steps_per_epoch,
validation_data=test_dataset,
validation_steps=validation_steps)
WARNING:tensorflow:The argument `steps_per_execution` is no longer experimental. Pass `steps_per_execution` instead of `experimental_steps_per_execution`. Warning:tensorflow:The argument `steps_per_execution` is no longer experimental. Pass `steps_per_execution` instead of `experimental_steps_per_execution`. Epoch 1/5 300/300 [==============================] - 14s 45ms/step - loss: 0.2200 - sparse_categorical_accuracy: 0.9316 - val_loss: 0.0424 - val_sparse_categorical_accuracy: 0.9856 Epoch 2/5 300/300 [==============================] - 5s 15ms/step - loss: 0.0366 - sparse_categorical_accuracy: 0.9896 - val_loss: 0.0357 - val_sparse_categorical_accuracy: 0.9885 Epoch 3/5 300/300 [==============================] - 5s 15ms/step - loss: 0.0204 - sparse_categorical_accuracy: 0.9937 - val_loss: 0.0391 - val_sparse_categorical_accuracy: 0.9887 Epoch 4/5 300/300 [==============================] - 5s 15ms/step - loss: 0.0140 - sparse_categorical_accuracy: 0.9951 - val_loss: 0.0457 - val_sparse_categorical_accuracy: 0.9868 Epoch 5/5 300/300 [==============================] - 5s 16ms/step - loss: 0.0104 - sparse_categorical_accuracy: 0.9967 - val_loss: 0.0519 - val_sparse_categorical_accuracy: 0.9859 <tensorflow.python.keras.callbacks.History at 0x7f16c5d965c0>
Entraînez un modèle à l'aide d'une boucle d'entraînement personnalisée.
Vous pouvez également créer et entraîner vos modèles en utilisant directement les API tf.function
et tf.distribute
. strategy.experimental_distribute_datasets_from_function
API strategy.experimental_distribute_datasets_from_function
est utilisée pour distribuer l'ensemble de données à partir d'une fonction d'ensemble de données. Notez que la taille de lot transmise à l'ensemble de données sera par taille de lot de réplique au lieu de la taille de lot globale dans ce cas. Pour en savoir plus, consultez le didacticiel Formation personnalisée avec tf.distribute.Strategy .
Commencez par créer le modèle, les ensembles de données et les tf.functions.
# Create the model, optimizer and metrics inside strategy scope, so that the
# variables can be mirrored on each device.
with strategy.scope():
model = create_model()
optimizer = tf.keras.optimizers.Adam()
training_loss = tf.keras.metrics.Mean('training_loss', dtype=tf.float32)
training_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
'training_accuracy', dtype=tf.float32)
# Calculate per replica batch size, and distribute the datasets on each TPU
# worker.
per_replica_batch_size = batch_size // strategy.num_replicas_in_sync
train_dataset = strategy.experimental_distribute_datasets_from_function(
lambda _: get_dataset(per_replica_batch_size, is_training=True))
@tf.function
def train_step(iterator):
"""The step function for one training step"""
def step_fn(inputs):
"""The computation to run on each TPU device."""
images, labels = inputs
with tf.GradientTape() as tape:
logits = model(images, training=True)
loss = tf.keras.losses.sparse_categorical_crossentropy(
labels, logits, from_logits=True)
loss = tf.nn.compute_average_loss(loss, global_batch_size=batch_size)
grads = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(list(zip(grads, model.trainable_variables)))
training_loss.update_state(loss * strategy.num_replicas_in_sync)
training_accuracy.update_state(labels, logits)
strategy.run(step_fn, args=(next(iterator),))
WARNING:tensorflow:From <ipython-input-1-2f075cabff81>:15: StrategyBase.experimental_distribute_datasets_from_function (from tensorflow.python.distribute.distribute_lib) is deprecated and will be removed in a future version. Instructions for updating: rename to distribute_datasets_from_function Warning:tensorflow:From <ipython-input-1-2f075cabff81>:15: StrategyBase.experimental_distribute_datasets_from_function (from tensorflow.python.distribute.distribute_lib) is deprecated and will be removed in a future version. Instructions for updating: rename to distribute_datasets_from_function
Exécutez ensuite la boucle d'entraînement.
steps_per_eval = 10000 // batch_size
train_iterator = iter(train_dataset)
for epoch in range(5):
print('Epoch: {}/5'.format(epoch))
for step in range(steps_per_epoch):
train_step(train_iterator)
print('Current step: {}, training loss: {}, accuracy: {}%'.format(
optimizer.iterations.numpy(),
round(float(training_loss.result()), 4),
round(float(training_accuracy.result()) * 100, 2)))
training_loss.reset_states()
training_accuracy.reset_states()
Epoch: 0/5 Current step: 300, training loss: 0.1313, accuracy: 95.91% Epoch: 1/5 Current step: 600, training loss: 0.0334, accuracy: 98.96% Epoch: 2/5 Current step: 900, training loss: 0.0189, accuracy: 99.42% Epoch: 3/5 Current step: 1200, training loss: 0.0131, accuracy: 99.57% Epoch: 4/5 Current step: 1500, training loss: 0.0104, accuracy: 99.64%
Amélioration des performances en plusieurs étapes dans tf.function
Les performances peuvent être améliorées en exécutant plusieurs étapes dans une fonction tf.function
. Ceci est réalisé en enveloppant l'appel de strategy.run
avec un tf.range
dans tf.function
, AutoGraph le convertira en un tf.while_loop
sur le worker TPU.
Bien qu'avec de meilleures performances, il existe des compromis à comparer avec une seule étape dans tf.function
. Exécuter plusieurs étapes dans une fonction tf.function
est moins flexible, vous ne pouvez pas exécuter les choses avec empressement ou du code python arbitraire dans les étapes.
@tf.function
def train_multiple_steps(iterator, steps):
"""The step function for one training step"""
def step_fn(inputs):
"""The computation to run on each TPU device."""
images, labels = inputs
with tf.GradientTape() as tape:
logits = model(images, training=True)
loss = tf.keras.losses.sparse_categorical_crossentropy(
labels, logits, from_logits=True)
loss = tf.nn.compute_average_loss(loss, global_batch_size=batch_size)
grads = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(list(zip(grads, model.trainable_variables)))
training_loss.update_state(loss * strategy.num_replicas_in_sync)
training_accuracy.update_state(labels, logits)
for _ in tf.range(steps):
strategy.run(step_fn, args=(next(iterator),))
# Convert `steps_per_epoch` to `tf.Tensor` so the `tf.function` won't get
# retraced if the value changes.
train_multiple_steps(train_iterator, tf.convert_to_tensor(steps_per_epoch))
print('Current step: {}, training loss: {}, accuracy: {}%'.format(
optimizer.iterations.numpy(),
round(float(training_loss.result()), 4),
round(float(training_accuracy.result()) * 100, 2)))
Current step: 1800, training loss: 0.0087, accuracy: 99.73%
Prochaines étapes
- Documentation Google Cloud TPU - Configurez et exécutez un Google Cloud TPU.
- Formation distribuée avec TensorFlow - Comment utiliser la stratégie de distribution et des liens vers de nombreux exemples illustrant les meilleures pratiques.
- Enregistrer / charger des modèles avec TensorFlow - Comment enregistrer et charger des modèles avec des stratégies de distribution.
- Modèles officiels TensorFlow - Exemples de modèles TensorFlow 2.x de pointe compatibles avec Cloud TPU.
- Le guide des performances de Google Cloud TPU - Améliorez encore les performances de Cloud TPU en ajustant les paramètres de configuration de Cloud TPU pour votre application.