¿Tengo una pregunta? Conéctese con la comunidad en el Foro de visita del foro de TensorFlow

Escribir un ciclo de entrenamiento desde cero

Ver en TensorFlow.org Ejecutar en Google Colab Ver fuente en GitHub Descargar cuaderno

Configuración

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import numpy as np

Introducción

Keras proporciona ciclos predeterminados de entrenamiento y evaluación, fit() y evaluate() . Su uso está cubierto en la guía Capacitación y evaluación con los métodos incorporados .

Si desea personalizar el algoritmo de aprendizaje de su modelo mientras sigue aprovechando la conveniencia de fit() (por ejemplo, para entrenar un GAN usando fit() ), puede subclasificar la clase Model e implementar su propio método train_step() , que se llama repetidamente durante el fit() . Esto se trata en la guía Personalización de lo que sucede en el fit() .

Ahora, si desea un control de muy bajo nivel sobre la capacitación y la evaluación, debe escribir sus propios ciclos de capacitación y evaluación desde cero. De esto se trata esta guía.

Usando GradientTape : un primer ejemplo de extremo a extremo

Llamar a un modelo dentro de un alcance GradientTape permite recuperar los gradientes de los pesos entrenables de la capa con respecto a un valor de pérdida. Usando una instancia de optimizador, puede usar estos gradientes para actualizar estas variables (que puede recuperar usando model.trainable_weights ).

Consideremos un modelo MNIST simple:

inputs = keras.Input(shape=(784,), name="digits")
x1 = layers.Dense(64, activation="relu")(inputs)
x2 = layers.Dense(64, activation="relu")(x1)
outputs = layers.Dense(10, name="predictions")(x2)
model = keras.Model(inputs=inputs, outputs=outputs)

Entreneémoslo usando un gradiente de mini lotes con un ciclo de entrenamiento personalizado.

Primero, necesitaremos un optimizador, una función de pérdida y un conjunto de datos:

# Instantiate an optimizer.
optimizer = keras.optimizers.SGD(learning_rate=1e-3)
# Instantiate a loss function.
loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)

# Prepare the training dataset.
batch_size = 64
(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()
x_train = np.reshape(x_train, (-1, 784))
x_test = np.reshape(x_test, (-1, 784))

# Reserve 10,000 samples for validation.
x_val = x_train[-10000:]
y_val = y_train[-10000:]
x_train = x_train[:-10000]
y_train = y_train[:-10000]

# Prepare the training dataset.
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = train_dataset.shuffle(buffer_size=1024).batch(batch_size)

# Prepare the validation dataset.
val_dataset = tf.data.Dataset.from_tensor_slices((x_val, y_val))
val_dataset = val_dataset.batch(batch_size)

Aquí está nuestro ciclo de entrenamiento:

  • Abrimos un bucle for que itera sobre épocas.
  • Para cada época, abrimos un bucle for que itera sobre el conjunto de datos, en lotes
  • Para cada lote, abrimos un alcance GradientTape()
  • Dentro de este alcance, llamamos al modelo (pase hacia adelante) y calculamos la pérdida
  • Fuera del alcance, recuperamos los gradientes de los pesos del modelo con respecto a la pérdida
  • Finalmente, usamos el optimizador para actualizar los pesos del modelo en función de los gradientes.
epochs = 2
for epoch in range(epochs):
    print("\nStart of epoch %d" % (epoch,))

    # Iterate over the batches of the dataset.
    for step, (x_batch_train, y_batch_train) in enumerate(train_dataset):

        # Open a GradientTape to record the operations run
        # during the forward pass, which enables auto-differentiation.
        with tf.GradientTape() as tape:

            # Run the forward pass of the layer.
            # The operations that the layer applies
            # to its inputs are going to be recorded
            # on the GradientTape.
            logits = model(x_batch_train, training=True)  # Logits for this minibatch

            # Compute the loss value for this minibatch.
            loss_value = loss_fn(y_batch_train, logits)

        # Use the gradient tape to automatically retrieve
        # the gradients of the trainable variables with respect to the loss.
        grads = tape.gradient(loss_value, model.trainable_weights)

        # Run one step of gradient descent by updating
        # the value of the variables to minimize the loss.
        optimizer.apply_gradients(zip(grads, model.trainable_weights))

        # Log every 200 batches.
        if step % 200 == 0:
            print(
                "Training loss (for one batch) at step %d: %.4f"
                % (step, float(loss_value))
            )
            print("Seen so far: %s samples" % ((step + 1) * batch_size))
Start of epoch 0
Training loss (for one batch) at step 0: 153.8545
Seen so far: 64 samples
Training loss (for one batch) at step 200: 1.4767
Seen so far: 12864 samples
Training loss (for one batch) at step 400: 1.4645
Seen so far: 25664 samples
Training loss (for one batch) at step 600: 0.7049
Seen so far: 38464 samples

Start of epoch 1
Training loss (for one batch) at step 0: 0.9202
Seen so far: 64 samples
Training loss (for one batch) at step 200: 0.8473
Seen so far: 12864 samples
Training loss (for one batch) at step 400: 0.6632
Seen so far: 25664 samples
Training loss (for one batch) at step 600: 0.8758
Seen so far: 38464 samples

Manejo de métricas de bajo nivel

Agreguemos monitoreo de métricas a este ciclo básico.

Puede reutilizar fácilmente las métricas integradas (o las personalizadas que escribió) en estos ciclos de entrenamiento escritos desde cero. Aquí está el flujo:

  • Cree una instancia de la métrica al comienzo del ciclo
  • Llame a metric.update_state() después de cada lote
  • Llame a metric.result() cuando necesite mostrar el valor actual de la métrica
  • Llame a metric.reset_states() cuando necesite borrar el estado de la métrica (generalmente al final de una época)

SparseCategoricalAccuracy este conocimiento para calcular SparseCategoricalAccuracy en los datos de validación al final de cada época:

# Get model
inputs = keras.Input(shape=(784,), name="digits")
x = layers.Dense(64, activation="relu", name="dense_1")(inputs)
x = layers.Dense(64, activation="relu", name="dense_2")(x)
outputs = layers.Dense(10, name="predictions")(x)
model = keras.Model(inputs=inputs, outputs=outputs)

# Instantiate an optimizer to train the model.
optimizer = keras.optimizers.SGD(learning_rate=1e-3)
# Instantiate a loss function.
loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)

# Prepare the metrics.
train_acc_metric = keras.metrics.SparseCategoricalAccuracy()
val_acc_metric = keras.metrics.SparseCategoricalAccuracy()

Aquí está nuestro ciclo de capacitación y evaluación:

import time

epochs = 2
for epoch in range(epochs):
    print("\nStart of epoch %d" % (epoch,))
    start_time = time.time()

    # Iterate over the batches of the dataset.
    for step, (x_batch_train, y_batch_train) in enumerate(train_dataset):
        with tf.GradientTape() as tape:
            logits = model(x_batch_train, training=True)
            loss_value = loss_fn(y_batch_train, logits)
        grads = tape.gradient(loss_value, model.trainable_weights)
        optimizer.apply_gradients(zip(grads, model.trainable_weights))

        # Update training metric.
        train_acc_metric.update_state(y_batch_train, logits)

        # Log every 200 batches.
        if step % 200 == 0:
            print(
                "Training loss (for one batch) at step %d: %.4f"
                % (step, float(loss_value))
            )
            print("Seen so far: %d samples" % ((step + 1) * batch_size))

    # Display metrics at the end of each epoch.
    train_acc = train_acc_metric.result()
    print("Training acc over epoch: %.4f" % (float(train_acc),))

    # Reset training metrics at the end of each epoch
    train_acc_metric.reset_states()

    # Run a validation loop at the end of each epoch.
    for x_batch_val, y_batch_val in val_dataset:
        val_logits = model(x_batch_val, training=False)
        # Update val metrics
        val_acc_metric.update_state(y_batch_val, val_logits)
    val_acc = val_acc_metric.result()
    val_acc_metric.reset_states()
    print("Validation acc: %.4f" % (float(val_acc),))
    print("Time taken: %.2fs" % (time.time() - start_time))
Start of epoch 0
Training loss (for one batch) at step 0: 114.3453
Seen so far: 64 samples
Training loss (for one batch) at step 200: 2.2635
Seen so far: 12864 samples
Training loss (for one batch) at step 400: 0.5206
Seen so far: 25664 samples
Training loss (for one batch) at step 600: 1.0906
Seen so far: 38464 samples
Training acc over epoch: 0.7022
Validation acc: 0.7853
Time taken: 5.38s

Start of epoch 1
Training loss (for one batch) at step 0: 0.5879
Seen so far: 64 samples
Training loss (for one batch) at step 200: 0.9477
Seen so far: 12864 samples
Training loss (for one batch) at step 400: 0.4649
Seen so far: 25664 samples
Training loss (for one batch) at step 600: 0.6874
Seen so far: 38464 samples
Training acc over epoch: 0.8114
Validation acc: 0.8291
Time taken: 5.46s

tf.function tu paso de entrenamiento con tf.function

El tiempo de ejecución predeterminado en TensorFlow 2.0 es una ejecución ávida . Como tal, nuestro ciclo de entrenamiento anterior se ejecuta con entusiasmo.

Esto es excelente para la depuración, pero la compilación de gráficos tiene una clara ventaja de rendimiento. Describir su cálculo como un gráfico estático permite que el marco aplique optimizaciones de rendimiento global. Esto es imposible cuando el marco se ve obligado a ejecutar con avidez una operación tras otra, sin conocimiento de lo que viene a continuación.

Puede compilar en un gráfico estático cualquier función que tome tensores como entrada. Simplemente agregue un decorador @tf.function en él, como este:

@tf.function
def train_step(x, y):
    with tf.GradientTape() as tape:
        logits = model(x, training=True)
        loss_value = loss_fn(y, logits)
    grads = tape.gradient(loss_value, model.trainable_weights)
    optimizer.apply_gradients(zip(grads, model.trainable_weights))
    train_acc_metric.update_state(y, logits)
    return loss_value

Hagamos lo mismo con el paso de evaluación:

@tf.function
def test_step(x, y):
    val_logits = model(x, training=False)
    val_acc_metric.update_state(y, val_logits)

Ahora, volvamos a ejecutar nuestro ciclo de entrenamiento con este paso de entrenamiento compilado:

import time

epochs = 2
for epoch in range(epochs):
    print("\nStart of epoch %d" % (epoch,))
    start_time = time.time()

    # Iterate over the batches of the dataset.
    for step, (x_batch_train, y_batch_train) in enumerate(train_dataset):
        loss_value = train_step(x_batch_train, y_batch_train)

        # Log every 200 batches.
        if step % 200 == 0:
            print(
                "Training loss (for one batch) at step %d: %.4f"
                % (step, float(loss_value))
            )
            print("Seen so far: %d samples" % ((step + 1) * batch_size))

    # Display metrics at the end of each epoch.
    train_acc = train_acc_metric.result()
    print("Training acc over epoch: %.4f" % (float(train_acc),))

    # Reset training metrics at the end of each epoch
    train_acc_metric.reset_states()

    # Run a validation loop at the end of each epoch.
    for x_batch_val, y_batch_val in val_dataset:
        test_step(x_batch_val, y_batch_val)

    val_acc = val_acc_metric.result()
    val_acc_metric.reset_states()
    print("Validation acc: %.4f" % (float(val_acc),))
    print("Time taken: %.2fs" % (time.time() - start_time))
Start of epoch 0
Training loss (for one batch) at step 0: 0.4854
Seen so far: 64 samples
Training loss (for one batch) at step 200: 0.5259
Seen so far: 12864 samples
Training loss (for one batch) at step 400: 0.5035
Seen so far: 25664 samples
Training loss (for one batch) at step 600: 0.2240
Seen so far: 38464 samples
Training acc over epoch: 0.8502
Validation acc: 0.8616
Time taken: 1.32s

Start of epoch 1
Training loss (for one batch) at step 0: 0.6278
Seen so far: 64 samples
Training loss (for one batch) at step 200: 0.3667
Seen so far: 12864 samples
Training loss (for one batch) at step 400: 0.3374
Seen so far: 25664 samples
Training loss (for one batch) at step 600: 0.5318
Seen so far: 38464 samples
Training acc over epoch: 0.8709
Validation acc: 0.8720
Time taken: 1.02s

Mucho más rápido, ¿no?

Manejo a bajo nivel de pérdidas rastreadas por el modelo

Las capas y modelos rastrean de forma recursiva cualquier pérdida creada durante el paso hacia adelante por capas que llaman a self.add_loss(value) . La lista resultante de valores de pérdida escalar está disponible a través del model.losses propiedad. model.losses al final de la pasada hacia adelante.

Si desea utilizar estos componentes de pérdida, debe sumarlos y agregarlos a la pérdida principal en su paso de entrenamiento.

Considere esta capa, que crea una pérdida de regularización de actividad:

class ActivityRegularizationLayer(layers.Layer):
    def call(self, inputs):
        self.add_loss(1e-2 * tf.reduce_sum(inputs))
        return inputs

Construyamos un modelo realmente simple que lo use:

inputs = keras.Input(shape=(784,), name="digits")
x = layers.Dense(64, activation="relu")(inputs)
# Insert activity regularization as a layer
x = ActivityRegularizationLayer()(x)
x = layers.Dense(64, activation="relu")(x)
outputs = layers.Dense(10, name="predictions")(x)

model = keras.Model(inputs=inputs, outputs=outputs)

Así es como debería verse nuestro paso de entrenamiento ahora:

@tf.function
def train_step(x, y):
    with tf.GradientTape() as tape:
        logits = model(x, training=True)
        loss_value = loss_fn(y, logits)
        # Add any extra losses created during the forward pass.
        loss_value += sum(model.losses)
    grads = tape.gradient(loss_value, model.trainable_weights)
    optimizer.apply_gradients(zip(grads, model.trainable_weights))
    train_acc_metric.update_state(y, logits)
    return loss_value

Resumen

Ahora ya sabe todo lo que hay que saber sobre el uso de bucles de entrenamiento integrados y la escritura de los suyos propios desde cero.

Para concluir, aquí hay un ejemplo simple de extremo a extremo que une todo lo que ha aprendido en esta guía: un DCGAN entrenado en dígitos MNIST.

Ejemplo de extremo a extremo: un ciclo de entrenamiento GAN desde cero

Es posible que esté familiarizado con las redes generativas de confrontación (GAN). Las GAN pueden generar nuevas imágenes que parecen casi reales, al aprender la distribución latente de un conjunto de datos de entrenamiento de imágenes (el "espacio latente" de las imágenes).

Una GAN consta de dos partes: un modelo "generador" que asigna puntos en el espacio latente a puntos en el espacio de la imagen, un modelo "discriminador", un clasificador que puede diferenciar entre imágenes reales (del conjunto de datos de entrenamiento) y falsas. imágenes (la salida de la red del generador).

Un ciclo de entrenamiento GAN se ve así:

1) Entrena al discriminador. - Muestra un lote de puntos aleatorios en el espacio latente. - Convierta los puntos en imágenes falsas a través del modelo "generador". - Obtenga un lote de imágenes reales y combínelas con las imágenes generadas. - Entrenar el modelo "discriminador" para clasificar imágenes generadas vs. reales.

2) Entrene al generador. - Muestra de puntos aleatorios en el espacio latente. - Convierta los puntos en imágenes falsas a través de la red "generadora". - Obtenga un lote de imágenes reales y combínelas con las imágenes generadas. - Entrenar al modelo "generador" para "engañar" al discriminador y clasificar las imágenes falsas como reales.

Para obtener una descripción general mucho más detallada de cómo funcionan las GAN, consulte Aprendizaje profundo con Python .

Implementemos este ciclo de entrenamiento. Primero, cree el discriminador destinado a clasificar dígitos falsos frente a dígitos reales:

discriminator = keras.Sequential(
    [
        keras.Input(shape=(28, 28, 1)),
        layers.Conv2D(64, (3, 3), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.Conv2D(128, (3, 3), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.GlobalMaxPooling2D(),
        layers.Dense(1),
    ],
    name="discriminator",
)
discriminator.summary()
Model: "discriminator"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
=================================================================
conv2d (Conv2D)              (None, 14, 14, 64)        640       
_________________________________________________________________
leaky_re_lu (LeakyReLU)      (None, 14, 14, 64)        0         
_________________________________________________________________
conv2d_1 (Conv2D)            (None, 7, 7, 128)         73856     
_________________________________________________________________
leaky_re_lu_1 (LeakyReLU)    (None, 7, 7, 128)         0         
_________________________________________________________________
global_max_pooling2d (Global (None, 128)               0         
_________________________________________________________________
dense_4 (Dense)              (None, 1)                 129       
=================================================================
Total params: 74,625
Trainable params: 74,625
Non-trainable params: 0
_________________________________________________________________

Luego, creemos una red generadora, que convierta los vectores latentes en salidas de forma (28, 28, 1) (que representan dígitos MNIST):

latent_dim = 128

generator = keras.Sequential(
    [
        keras.Input(shape=(latent_dim,)),
        # We want to generate 128 coefficients to reshape into a 7x7x128 map
        layers.Dense(7 * 7 * 128),
        layers.LeakyReLU(alpha=0.2),
        layers.Reshape((7, 7, 128)),
        layers.Conv2DTranspose(128, (4, 4), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.Conv2DTranspose(128, (4, 4), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.Conv2D(1, (7, 7), padding="same", activation="sigmoid"),
    ],
    name="generator",
)

Aquí está la parte clave: el ciclo de entrenamiento. Como puede ver, es bastante sencillo. La función de paso de entrenamiento solo ocupa 17 líneas.

# Instantiate one optimizer for the discriminator and another for the generator.
d_optimizer = keras.optimizers.Adam(learning_rate=0.0003)
g_optimizer = keras.optimizers.Adam(learning_rate=0.0004)

# Instantiate a loss function.
loss_fn = keras.losses.BinaryCrossentropy(from_logits=True)


@tf.function
def train_step(real_images):
    # Sample random points in the latent space
    random_latent_vectors = tf.random.normal(shape=(batch_size, latent_dim))
    # Decode them to fake images
    generated_images = generator(random_latent_vectors)
    # Combine them with real images
    combined_images = tf.concat([generated_images, real_images], axis=0)

    # Assemble labels discriminating real from fake images
    labels = tf.concat(
        [tf.ones((batch_size, 1)), tf.zeros((real_images.shape[0], 1))], axis=0
    )
    # Add random noise to the labels - important trick!
    labels += 0.05 * tf.random.uniform(labels.shape)

    # Train the discriminator
    with tf.GradientTape() as tape:
        predictions = discriminator(combined_images)
        d_loss = loss_fn(labels, predictions)
    grads = tape.gradient(d_loss, discriminator.trainable_weights)
    d_optimizer.apply_gradients(zip(grads, discriminator.trainable_weights))

    # Sample random points in the latent space
    random_latent_vectors = tf.random.normal(shape=(batch_size, latent_dim))
    # Assemble labels that say "all real images"
    misleading_labels = tf.zeros((batch_size, 1))

    # Train the generator (note that we should *not* update the weights
    # of the discriminator)!
    with tf.GradientTape() as tape:
        predictions = discriminator(generator(random_latent_vectors))
        g_loss = loss_fn(misleading_labels, predictions)
    grads = tape.gradient(g_loss, generator.trainable_weights)
    g_optimizer.apply_gradients(zip(grads, generator.trainable_weights))
    return d_loss, g_loss, generated_images

train_step nuestro GAN, llamando repetidamente a train_step en lotes de imágenes.

Dado que nuestro discriminador y generador son convnets, querrá ejecutar este código en una GPU.

import os

# Prepare the dataset. We use both the training & test MNIST digits.
batch_size = 64
(x_train, _), (x_test, _) = keras.datasets.mnist.load_data()
all_digits = np.concatenate([x_train, x_test])
all_digits = all_digits.astype("float32") / 255.0
all_digits = np.reshape(all_digits, (-1, 28, 28, 1))
dataset = tf.data.Dataset.from_tensor_slices(all_digits)
dataset = dataset.shuffle(buffer_size=1024).batch(batch_size)

epochs = 1  # In practice you need at least 20 epochs to generate nice digits.
save_dir = "./"

for epoch in range(epochs):
    print("\nStart epoch", epoch)

    for step, real_images in enumerate(dataset):
        # Train the discriminator & generator on one batch of real images.
        d_loss, g_loss, generated_images = train_step(real_images)

        # Logging.
        if step % 200 == 0:
            # Print metrics
            print("discriminator loss at step %d: %.2f" % (step, d_loss))
            print("adversarial loss at step %d: %.2f" % (step, g_loss))

            # Save one generated image
            img = tf.keras.preprocessing.image.array_to_img(
                generated_images[0] * 255.0, scale=False
            )
            img.save(os.path.join(save_dir, "generated_img" + str(step) + ".png"))

        # To limit execution time we stop after 10 steps.
        # Remove the lines below to actually train the model!
        if step > 10:
            break
Start epoch 0
discriminator loss at step 0: 0.68
adversarial loss at step 0: 0.67

¡Eso es! Obtendrá dígitos MNIST falsos de buen aspecto después de solo ~ 30 segundos de entrenamiento en la GPU Colab.