Cette page a été traduite par l'API Cloud Translation.
Switch to English

Ecrire une boucle d'entraînement à partir de zéro

Voir sur TensorFlow.org Exécuter dans Google Colab Afficher la source sur GitHub Télécharger le carnet

Installer

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import numpy as np

introduction

Keras fournit des boucles d'entraînement et d'évaluation par défaut, fit() et evaluate() . Leur utilisation est couverte dans le guide Formation et évaluation avec les méthodes intégrées .

Si vous souhaitez personnaliser l'algorithme d'apprentissage de votre modèle tout en tirant parti de la commodité de fit() (par exemple, pour entraîner un GAN à l'aide de fit() ), vous pouvez sous-classer la classe Model et implémenter votre propre méthode train_step() , qui est appelé à plusieurs reprises pendant fit() . Ceci est couvert dans le guide Personnaliser ce qui se passe dans fit() .

Maintenant, si vous voulez un contrôle de très bas niveau sur la formation et l'évaluation, vous devez rédiger vos propres boucles de formation et d'évaluation à partir de zéro. C'est le sujet de ce guide.

Utilisation du GradientTape : un premier exemple de bout en bout

L'appel d'un modèle dans une étendue GradientTape vous permet de récupérer les gradients des poids entraînables de la couche par rapport à une valeur de perte. À l'aide d'une instance d'optimisation, vous pouvez utiliser ces dégradés pour mettre à jour ces variables (que vous pouvez récupérer à l'aide de model.trainable_weights ).

Considérons un modèle MNIST simple:

inputs = keras.Input(shape=(784,), name="digits")
x1 = layers.Dense(64, activation="relu")(inputs)
x2 = layers.Dense(64, activation="relu")(x1)
outputs = layers.Dense(10, name="predictions")(x2)
model = keras.Model(inputs=inputs, outputs=outputs)

Entraînons-le en utilisant un gradient de mini-lot avec une boucle d'entraînement personnalisée.

Tout d'abord, nous allons avoir besoin d'un optimiseur, d'une fonction de perte et d'un ensemble de données:

# Instantiate an optimizer.
optimizer = keras.optimizers.SGD(learning_rate=1e-3)
# Instantiate a loss function.
loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)

# Prepare the training dataset.
batch_size = 64
(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()
x_train = np.reshape(x_train, (-1, 784))
x_test = np.reshape(x_test, (-1, 784))
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = train_dataset.shuffle(buffer_size=1024).batch(batch_size)

Voici notre boucle de formation:

  • Nous ouvrons une boucle for qui itère sur les époques
  • Pour chaque époque, nous ouvrons une boucle for qui itère sur l'ensemble de données, par lots
  • Pour chaque lot, nous ouvrons une portée GradientTape()
  • Dans cette portée, nous appelons le modèle (forward pass) et calculons la perte
  • En dehors du périmètre, on récupère les gradients des poids du modèle par rapport à la perte
  • Enfin, nous utilisons l'optimiseur pour mettre à jour les poids du modèle en fonction des dégradés
epochs = 2
for epoch in range(epochs):
    print("\nStart of epoch %d" % (epoch,))

    # Iterate over the batches of the dataset.
    for step, (x_batch_train, y_batch_train) in enumerate(train_dataset):

        # Open a GradientTape to record the operations run
        # during the forward pass, which enables auto-differentiation.
        with tf.GradientTape() as tape:

            # Run the forward pass of the layer.
            # The operations that the layer applies
            # to its inputs are going to be recorded
            # on the GradientTape.
            logits = model(x_batch_train, training=True)  # Logits for this minibatch

            # Compute the loss value for this minibatch.
            loss_value = loss_fn(y_batch_train, logits)

        # Use the gradient tape to automatically retrieve
        # the gradients of the trainable variables with respect to the loss.
        grads = tape.gradient(loss_value, model.trainable_weights)

        # Run one step of gradient descent by updating
        # the value of the variables to minimize the loss.
        optimizer.apply_gradients(zip(grads, model.trainable_weights))

        # Log every 200 batches.
        if step % 200 == 0:
            print(
                "Training loss (for one batch) at step %d: %.4f"
                % (step, float(loss_value))
            )
            print("Seen so far: %s samples" % ((step + 1) * 64))

Start of epoch 0
Training loss (for one batch) at step 0: 118.1961
Seen so far: 64 samples
Training loss (for one batch) at step 200: 2.1943
Seen so far: 12864 samples
Training loss (for one batch) at step 400: 0.9606
Seen so far: 25664 samples
Training loss (for one batch) at step 600: 0.9748
Seen so far: 38464 samples
Training loss (for one batch) at step 800: 0.3943
Seen so far: 51264 samples

Start of epoch 1
Training loss (for one batch) at step 0: 0.4881
Seen so far: 64 samples
Training loss (for one batch) at step 200: 0.5267
Seen so far: 12864 samples
Training loss (for one batch) at step 400: 0.6100
Seen so far: 25664 samples
Training loss (for one batch) at step 600: 0.3300
Seen so far: 38464 samples
Training loss (for one batch) at step 800: 0.3622
Seen so far: 51264 samples

Gestion de bas niveau des métriques

Ajoutons la surveillance des métriques à cette boucle de base.

Vous pouvez facilement réutiliser les métriques intégrées (ou les métriques personnalisées que vous avez écrites) dans de telles boucles de formation écrites à partir de zéro. Voici le flux:

  • Instanciez la métrique au début de la boucle
  • Appelez metric.update_state() après chaque lot
  • Appelez metric.result() lorsque vous devez afficher la valeur actuelle de la métrique
  • Appelez metric.reset_states() lorsque vous devez effacer l'état de la métrique (généralement à la fin d'une époque)

Utilisons ces connaissances pour calculer SparseCategoricalAccuracy sur les données de validation à la fin de chaque époque:

# Get model
inputs = keras.Input(shape=(784,), name="digits")
x = layers.Dense(64, activation="relu", name="dense_1")(inputs)
x = layers.Dense(64, activation="relu", name="dense_2")(x)
outputs = layers.Dense(10, name="predictions")(x)
model = keras.Model(inputs=inputs, outputs=outputs)

# Instantiate an optimizer to train the model.
optimizer = keras.optimizers.SGD(learning_rate=1e-3)
# Instantiate a loss function.
loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)

# Prepare the metrics.
train_acc_metric = keras.metrics.SparseCategoricalAccuracy()
val_acc_metric = keras.metrics.SparseCategoricalAccuracy()

# Prepare the training dataset.
batch_size = 64
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = train_dataset.shuffle(buffer_size=1024).batch(batch_size)

# Prepare the validation dataset.
# Reserve 10,000 samples for validation.
x_val = x_train[-10000:]
y_val = y_train[-10000:]
x_train = x_train[:-10000]
y_train = y_train[:-10000]
val_dataset = tf.data.Dataset.from_tensor_slices((x_val, y_val))
val_dataset = val_dataset.batch(64)

Voici notre boucle de formation et d'évaluation:

import time

epochs = 2
for epoch in range(epochs):
    print("\nStart of epoch %d" % (epoch,))
    start_time = time.time()

    # Iterate over the batches of the dataset.
    for step, (x_batch_train, y_batch_train) in enumerate(train_dataset):
        with tf.GradientTape() as tape:
            logits = model(x_batch_train, training=True)
            loss_value = loss_fn(y_batch_train, logits)
        grads = tape.gradient(loss_value, model.trainable_weights)
        optimizer.apply_gradients(zip(grads, model.trainable_weights))

        # Update training metric.
        train_acc_metric.update_state(y_batch_train, logits)

        # Log every 200 batches.
        if step % 200 == 0:
            print(
                "Training loss (for one batch) at step %d: %.4f"
                % (step, float(loss_value))
            )
            print("Seen so far: %d samples" % ((step + 1) * 64))

    # Display metrics at the end of each epoch.
    train_acc = train_acc_metric.result()
    print("Training acc over epoch: %.4f" % (float(train_acc),))

    # Reset training metrics at the end of each epoch
    train_acc_metric.reset_states()

    # Run a validation loop at the end of each epoch.
    for x_batch_val, y_batch_val in val_dataset:
        val_logits = model(x_batch_val, training=False)
        # Update val metrics
        val_acc_metric.update_state(y_batch_val, val_logits)
    val_acc = val_acc_metric.result()
    val_acc_metric.reset_states()
    print("Validation acc: %.4f" % (float(val_acc),))
    print("Time taken: %.2fs" % (time.time() - start_time))

Start of epoch 0
Training loss (for one batch) at step 0: 88.3728
Seen so far: 64 samples
Training loss (for one batch) at step 200: 1.3446
Seen so far: 12864 samples
Training loss (for one batch) at step 400: 0.7409
Seen so far: 25664 samples
Training loss (for one batch) at step 600: 0.4098
Seen so far: 38464 samples
Training loss (for one batch) at step 800: 0.7732
Seen so far: 51264 samples
Training acc over epoch: 0.7515
Validation acc: 0.8552
Time taken: 5.60s

Start of epoch 1
Training loss (for one batch) at step 0: 0.6751
Seen so far: 64 samples
Training loss (for one batch) at step 200: 0.5519
Seen so far: 12864 samples
Training loss (for one batch) at step 400: 0.6730
Seen so far: 25664 samples
Training loss (for one batch) at step 600: 0.2807
Seen so far: 38464 samples
Training loss (for one batch) at step 800: 0.6229
Seen so far: 51264 samples
Training acc over epoch: 0.8577
Validation acc: 0.8908
Time taken: 5.62s

Accélérez votre étape d'entraînement avec tf.function

Le runtime par défaut dans TensorFlow 2.0 est une exécution rapide . En tant que tel, notre boucle d'entraînement ci-dessus s'exécute avec impatience.

C'est parfait pour le débogage, mais la compilation de graphes a un avantage certain en termes de performances. La description de votre calcul sous forme de graphique statique permet au framework d'appliquer des optimisations de performances globales. Ceci est impossible lorsque le framework est contraint d'exécuter avidement une opération après l'autre, sans aucune connaissance de ce qui va suivre.

Vous pouvez compiler dans un graphe statique toute fonction qui prend des tenseurs comme entrée. Ajoutez simplement un décorateur @tf.function dessus, comme ceci:

@tf.function
def train_step(x, y):
    with tf.GradientTape() as tape:
        logits = model(x, training=True)
        loss_value = loss_fn(y, logits)
    grads = tape.gradient(loss_value, model.trainable_weights)
    optimizer.apply_gradients(zip(grads, model.trainable_weights))
    train_acc_metric.update_state(y, logits)
    return loss_value

Faisons de même avec l'étape d'évaluation:

@tf.function
def test_step(x, y):
    val_logits = model(x, training=False)
    val_acc_metric.update_state(y, val_logits)

Maintenant, réexécutons notre boucle d'entraînement avec cette étape d'entraînement compilée:

import time

epochs = 2
for epoch in range(epochs):
    print("\nStart of epoch %d" % (epoch,))
    start_time = time.time()

    # Iterate over the batches of the dataset.
    for step, (x_batch_train, y_batch_train) in enumerate(train_dataset):
        loss_value = train_step(x_batch_train, y_batch_train)

        # Log every 200 batches.
        if step % 200 == 0:
            print(
                "Training loss (for one batch) at step %d: %.4f"
                % (step, float(loss_value))
            )
            print("Seen so far: %d samples" % ((step + 1) * 64))

    # Display metrics at the end of each epoch.
    train_acc = train_acc_metric.result()
    print("Training acc over epoch: %.4f" % (float(train_acc),))

    # Reset training metrics at the end of each epoch
    train_acc_metric.reset_states()

    # Run a validation loop at the end of each epoch.
    for x_batch_val, y_batch_val in val_dataset:
        test_step(x_batch_val, y_batch_val)

    val_acc = val_acc_metric.result()
    val_acc_metric.reset_states()
    print("Validation acc: %.4f" % (float(val_acc),))
    print("Time taken: %.2fs" % (time.time() - start_time))

Start of epoch 0
Training loss (for one batch) at step 0: 0.7984
Seen so far: 64 samples
Training loss (for one batch) at step 200: 0.6274
Seen so far: 12864 samples
Training loss (for one batch) at step 400: 0.2540
Seen so far: 25664 samples
Training loss (for one batch) at step 600: 0.5385
Seen so far: 38464 samples
Training loss (for one batch) at step 800: 0.2974
Seen so far: 51264 samples
Training acc over epoch: 0.8866
Validation acc: 0.9082
Time taken: 1.35s

Start of epoch 1
Training loss (for one batch) at step 0: 0.5614
Seen so far: 64 samples
Training loss (for one batch) at step 200: 0.7772
Seen so far: 12864 samples
Training loss (for one batch) at step 400: 0.5483
Seen so far: 25664 samples
Training loss (for one batch) at step 600: 0.6677
Seen so far: 38464 samples
Training loss (for one batch) at step 800: 0.5454
Seen so far: 51264 samples
Training acc over epoch: 0.9011
Validation acc: 0.9202
Time taken: 1.10s

Beaucoup plus vite, non?

Gestion de bas niveau des pertes suivies par le modèle

Les couches et modèles suivent de manière récursive toutes les pertes créées lors du passage vers l'avant par des couches qui appellent self.add_loss(value) . La liste résultante des valeurs de perte scalaire est disponible via la propriété model.losses à la fin de la passe avant.

Si vous souhaitez utiliser ces composants de perte, vous devez les additionner et les ajouter à la perte principale de votre étape d'entraînement.

Considérez cette couche, qui crée une perte de régularisation d'activité:

class ActivityRegularizationLayer(layers.Layer):
    def call(self, inputs):
        self.add_loss(1e-2 * tf.reduce_sum(inputs))
        return inputs

Construisons un modèle vraiment simple qui l'utilise:

inputs = keras.Input(shape=(784,), name="digits")
x = layers.Dense(64, activation="relu")(inputs)
# Insert activity regularization as a layer
x = ActivityRegularizationLayer()(x)
x = layers.Dense(64, activation="relu")(x)
outputs = layers.Dense(10, name="predictions")(x)

model = keras.Model(inputs=inputs, outputs=outputs)

Voici à quoi devrait ressembler notre étape de formation maintenant:

@tf.function
def train_step(x, y):
    with tf.GradientTape() as tape:
        logits = model(x, training=True)
        loss_value = loss_fn(y, logits)
        # Add any extra losses created during the forward pass.
        loss_value += sum(model.losses)
    grads = tape.gradient(loss_value, model.trainable_weights)
    optimizer.apply_gradients(zip(grads, model.trainable_weights))
    train_acc_metric.update_state(y, logits)
    return loss_value

Résumé

Vous savez maintenant tout ce qu'il y a à savoir sur l'utilisation des boucles d'entraînement intégrées et l'écriture des vôtres à partir de zéro.

Pour conclure, voici un exemple simple de bout en bout qui relie tout ce que vous avez appris dans ce guide: un DCGAN formé aux chiffres MNIST.

Exemple de bout en bout: une boucle d'entraînement GAN à partir de zéro

Vous connaissez peut-être les Réseaux Adversaires Génératifs (GAN). Les GAN peuvent générer de nouvelles images qui semblent presque réelles, en apprenant la distribution latente d'un ensemble de données d'apprentissage d'images («l'espace latent» des images).

Un GAN est composé de deux parties: un modèle "générateur" qui mappe des points dans l'espace latent à des points dans l'espace image, un modèle "discriminateur", un classificateur qui peut faire la différence entre des images réelles (issues de l'ensemble de données d'apprentissage) et des faux images (la sortie du réseau de générateurs).

Une boucle d'entraînement GAN ressemble à ceci:

1) Former le discriminateur. - Échantillonnez un lot de points aléatoires dans l'espace latent. - Transformez les points en fausses images via le modèle "générateur". - Obtenez un lot d'images réelles et combinez-les avec les images générées. - Former le modèle "discriminateur" pour classer les images générées par rapport aux images réelles.

2) Former le générateur. - Échantillonner des points aléatoires dans l'espace latent. - Transformez les points en fausses images via le réseau "générateur". - Obtenez un lot d'images réelles et combinez-les avec les images générées. - Former le modèle "générateur" pour "tromper" le discriminateur et classer les fausses images comme réelles.

Pour un aperçu beaucoup plus détaillé du fonctionnement des GAN, consultez Deep Learning avec Python .

Implémentons cette boucle d'entraînement. Tout d'abord, créez le discriminateur destiné à classer les faux chiffres par rapport aux vrais:

discriminator = keras.Sequential(
    [
        keras.Input(shape=(28, 28, 1)),
        layers.Conv2D(64, (3, 3), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.Conv2D(128, (3, 3), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.GlobalMaxPooling2D(),
        layers.Dense(1),
    ],
    name="discriminator",
)
discriminator.summary()
Model: "discriminator"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
=================================================================
conv2d (Conv2D)              (None, 14, 14, 64)        640       
_________________________________________________________________
leaky_re_lu (LeakyReLU)      (None, 14, 14, 64)        0         
_________________________________________________________________
conv2d_1 (Conv2D)            (None, 7, 7, 128)         73856     
_________________________________________________________________
leaky_re_lu_1 (LeakyReLU)    (None, 7, 7, 128)         0         
_________________________________________________________________
global_max_pooling2d (Global (None, 128)               0         
_________________________________________________________________
dense_4 (Dense)              (None, 1)                 129       
=================================================================
Total params: 74,625
Trainable params: 74,625
Non-trainable params: 0
_________________________________________________________________

Créons ensuite un réseau de générateurs, qui transforme les vecteurs latents en sorties de forme (28, 28, 1) (représentant les chiffres MNIST):

latent_dim = 128

generator = keras.Sequential(
    [
        keras.Input(shape=(latent_dim,)),
        # We want to generate 128 coefficients to reshape into a 7x7x128 map
        layers.Dense(7 * 7 * 128),
        layers.LeakyReLU(alpha=0.2),
        layers.Reshape((7, 7, 128)),
        layers.Conv2DTranspose(128, (4, 4), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.Conv2DTranspose(128, (4, 4), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.Conv2D(1, (7, 7), padding="same", activation="sigmoid"),
    ],
    name="generator",
)

Voici le bit clé: la boucle d'entraînement. Comme vous pouvez le voir, c'est assez simple. La fonction d'étape d'entraînement ne prend que 17 lignes.

# Instantiate one optimizer for the discriminator and another for the generator.
d_optimizer = keras.optimizers.Adam(learning_rate=0.0003)
g_optimizer = keras.optimizers.Adam(learning_rate=0.0004)

# Instantiate a loss function.
loss_fn = keras.losses.BinaryCrossentropy(from_logits=True)


@tf.function
def train_step(real_images):
    # Sample random points in the latent space
    random_latent_vectors = tf.random.normal(shape=(batch_size, latent_dim))
    # Decode them to fake images
    generated_images = generator(random_latent_vectors)
    # Combine them with real images
    combined_images = tf.concat([generated_images, real_images], axis=0)

    # Assemble labels discriminating real from fake images
    labels = tf.concat(
        [tf.ones((batch_size, 1)), tf.zeros((real_images.shape[0], 1))], axis=0
    )
    # Add random noise to the labels - important trick!
    labels += 0.05 * tf.random.uniform(labels.shape)

    # Train the discriminator
    with tf.GradientTape() as tape:
        predictions = discriminator(combined_images)
        d_loss = loss_fn(labels, predictions)
    grads = tape.gradient(d_loss, discriminator.trainable_weights)
    d_optimizer.apply_gradients(zip(grads, discriminator.trainable_weights))

    # Sample random points in the latent space
    random_latent_vectors = tf.random.normal(shape=(batch_size, latent_dim))
    # Assemble labels that say "all real images"
    misleading_labels = tf.zeros((batch_size, 1))

    # Train the generator (note that we should *not* update the weights
    # of the discriminator)!
    with tf.GradientTape() as tape:
        predictions = discriminator(generator(random_latent_vectors))
        g_loss = loss_fn(misleading_labels, predictions)
    grads = tape.gradient(g_loss, generator.trainable_weights)
    g_optimizer.apply_gradients(zip(grads, generator.trainable_weights))
    return d_loss, g_loss, generated_images

train_step notre GAN, en appelant à plusieurs reprises train_step sur des lots d'images.

Puisque notre discriminateur et notre générateur sont des convnets, vous allez vouloir exécuter ce code sur un GPU.

import os

# Prepare the dataset. We use both the training & test MNIST digits.
batch_size = 64
(x_train, _), (x_test, _) = keras.datasets.mnist.load_data()
all_digits = np.concatenate([x_train, x_test])
all_digits = all_digits.astype("float32") / 255.0
all_digits = np.reshape(all_digits, (-1, 28, 28, 1))
dataset = tf.data.Dataset.from_tensor_slices(all_digits)
dataset = dataset.shuffle(buffer_size=1024).batch(batch_size)

epochs = 1  # In practice you need at least 20 epochs to generate nice digits.
save_dir = "./"

for epoch in range(epochs):
    print("\nStart epoch", epoch)

    for step, real_images in enumerate(dataset):
        # Train the discriminator & generator on one batch of real images.
        d_loss, g_loss, generated_images = train_step(real_images)

        # Logging.
        if step % 200 == 0:
            # Print metrics
            print("discriminator loss at step %d: %.2f" % (step, d_loss))
            print("adversarial loss at step %d: %.2f" % (step, g_loss))

            # Save one generated image
            img = tf.keras.preprocessing.image.array_to_img(
                generated_images[0] * 255.0, scale=False
            )
            img.save(os.path.join(save_dir, "generated_img" + str(step) + ".png"))

        # To limit execution time we stop after 10 steps.
        # Remove the lines below to actually train the model!
        if step > 10:
            break

Start epoch 0
discriminator loss at step 0: 0.68
adversarial loss at step 0: 0.69

C'est tout! Vous obtiendrez de jolis faux chiffres MNIST après seulement environ 30 secondes d'entraînement sur le GPU Colab.