Personaliza lo que sucede en Model.fit

Ver en TensorFlow.org Ejecutar en Google Colab Ver fuente en GitHub Descargar cuaderno

Introducción

Cuando estás haciendo el aprendizaje supervisado, se puede utilizar fit() y todo funciona sin problemas.

Cuando se necesita para escribir su propio bucle de formación a partir de cero, se puede utilizar el GradientTape y tomar el control de todos los detalles.

Pero lo que si usted necesita un algoritmo de entrenamiento a medida, pero todavía quiere beneficiarse de las características convenientes de fit() , tales como las devoluciones de llamada, una función de apoyo a la distribución, o la fusión paso?

Un principio básico de Keras es la revelación progresiva de la complejidad. Siempre debe poder ingresar a flujos de trabajo de nivel inferior de manera gradual. No debe caer por un precipicio si la funcionalidad de alto nivel no coincide exactamente con su caso de uso. Debería poder obtener más control sobre los pequeños detalles mientras conserva una cantidad proporcional de conveniencia de alto nivel.

Cuando tenga que personalizar lo que fit() lleva, deberá anular la función de paso de formación del Model de clases. Esta es la función que se llama por fit() para cada lote de datos. A continuación, será capaz de llamada de fit() , como de costumbre - y se va a ejecutar su propio algoritmo de aprendizaje.

Tenga en cuenta que este patrón no le impide crear modelos con la API funcional. Usted puede hacer esto si usted está construyendo Sequential modelos, modelos funcionales de la API, o modelos subclases.

Veamos cómo funciona.

Configuración

Requiere TensorFlow 2.2 o posterior.

import tensorflow as tf
from tensorflow import keras

Un primer ejemplo sencillo

Comencemos con un ejemplo simple:

  • Creamos una nueva clase que subclases keras.Model .
  • Acabamos de reemplazar el método train_step(self, data) .
  • Devolvemos los nombres de las métricas de asignación de un diccionario (incluida la pérdida) a su valor actual.

El argumento de entrada data es lo que se pasa a encajar como datos de entrenamiento:

  • Si pasa matrices numpy, llamando fit(x, y, ...) , a continuación, data serán la tupla (x, y)
  • Si pasa un tf.data.Dataset , llamando fit(dataset, ...) , a continuación, data serán lo que se produjo por el dataset en cada lote.

En el cuerpo de la train_step método, se implementa una actualización de la formación regular, similar a lo que ya está familiarizado con. Es importante destacar que, se calcula la pérdida a través de self.compiled_loss , que envuelve la función de pérdida (s) (s) que se pasa al compile() .

Del mismo modo, llamamos self.compiled_metrics.update_state(y, y_pred) para actualizar el estado de los indicadores que se aprobaron en compile() , y los resultados de la consulta self.metrics al final recuperamos a su valor actual.

class CustomModel(keras.Model):
    def train_step(self, data):
        # Unpack the data. Its structure depends on your model and
        # on what you pass to `fit()`.
        x, y = data

        with tf.GradientTape() as tape:
            y_pred = self(x, training=True)  # Forward pass
            # Compute the loss value
            # (the loss function is configured in `compile()`)
            loss = self.compiled_loss(y, y_pred, regularization_losses=self.losses)

        # Compute gradients
        trainable_vars = self.trainable_variables
        gradients = tape.gradient(loss, trainable_vars)
        # Update weights
        self.optimizer.apply_gradients(zip(gradients, trainable_vars))
        # Update metrics (includes the metric that tracks the loss)
        self.compiled_metrics.update_state(y, y_pred)
        # Return a dict mapping metric names to current value
        return {m.name: m.result() for m in self.metrics}

Probemos esto:

import numpy as np

# Construct and compile an instance of CustomModel
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)
model.compile(optimizer="adam", loss="mse", metrics=["mae"])

# Just use `fit` as usual
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
model.fit(x, y, epochs=3)
Epoch 1/3
32/32 [==============================] - 1s 2ms/step - loss: 0.9909 - mae: 0.8601
Epoch 2/3
32/32 [==============================] - 0s 2ms/step - loss: 0.4363 - mae: 0.5345
Epoch 3/3
32/32 [==============================] - 0s 2ms/step - loss: 0.2906 - mae: 0.4311
<keras.callbacks.History at 0x7f5ad1ca1090>

Yendo a un nivel inferior

Naturalmente, usted podría saltar pasando una función de pérdida de compile() , y en lugar de hacer todo manualmente en train_step . Lo mismo ocurre con las métricas.

He aquí un ejemplo de nivel inferior, que sólo utiliza compile() para configurar el optimizador:

  • Comenzamos creando Metric instancias para realizar un seguimiento de nuestra pérdida y una puntuación MAE.
  • Ponemos en práctica una costumbre train_step() que actualiza el estado de estas métricas (llamando update_state() sobre ellos), entonces se le pregunta ellos (a través de result() ) para devolver su valor medio actual, que se muestra por la barra de progreso y para ser pasar a cualquier devolución de llamada.
  • Tenga en cuenta que tendríamos que llamar reset_states() en nuestras métricas entre cada época! De lo contrario llamar result() devolvería un promedio desde el inicio de la formación, mientras que por lo general trabajan con promedios por cada época. Afortunadamente, el marco puede hacer eso por nosotros: simplemente indica alguna métrica que desea restablecer en el metrics propiedad del modelo. El modelo llamará reset_states() en cualquier objeto que aparece aquí en el comienzo de cada fit() época o al comienzo de una llamada a evaluate() .
loss_tracker = keras.metrics.Mean(name="loss")
mae_metric = keras.metrics.MeanAbsoluteError(name="mae")


class CustomModel(keras.Model):
    def train_step(self, data):
        x, y = data

        with tf.GradientTape() as tape:
            y_pred = self(x, training=True)  # Forward pass
            # Compute our own loss
            loss = keras.losses.mean_squared_error(y, y_pred)

        # Compute gradients
        trainable_vars = self.trainable_variables
        gradients = tape.gradient(loss, trainable_vars)

        # Update weights
        self.optimizer.apply_gradients(zip(gradients, trainable_vars))

        # Compute our own metrics
        loss_tracker.update_state(loss)
        mae_metric.update_state(y, y_pred)
        return {"loss": loss_tracker.result(), "mae": mae_metric.result()}

    @property
    def metrics(self):
        # We list our `Metric` objects here so that `reset_states()` can be
        # called automatically at the start of each epoch
        # or at the start of `evaluate()`.
        # If you don't implement this property, you have to call
        # `reset_states()` yourself at the time of your choosing.
        return [loss_tracker, mae_metric]


# Construct an instance of CustomModel
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)

# We don't passs a loss or metrics here.
model.compile(optimizer="adam")

# Just use `fit` as usual -- you can use callbacks, etc.
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
model.fit(x, y, epochs=5)
Epoch 1/5
32/32 [==============================] - 0s 1ms/step - loss: 1.5969 - mae: 1.1523
Epoch 2/5
32/32 [==============================] - 0s 1ms/step - loss: 0.7352 - mae: 0.7310
Epoch 3/5
32/32 [==============================] - 0s 1ms/step - loss: 0.3830 - mae: 0.4999
Epoch 4/5
32/32 [==============================] - 0s 1ms/step - loss: 0.2809 - mae: 0.4215
Epoch 5/5
32/32 [==============================] - 0s 1ms/step - loss: 0.2590 - mae: 0.4058
<keras.callbacks.History at 0x7f5ad1b62c50>

Apoyando sample_weight y class_weight

Es posible que haya notado que nuestro primer ejemplo básico no mencionó la ponderación de la muestra. Si quieres apoyar el fit() argumentos sample_weight y class_weight , usted sólo tiene que hacer lo siguiente:

  • Desempaquetar sample_weight Del data argumento
  • Pasarlo a compiled_loss y compiled_metrics (por supuesto, usted podría también acaba de aplicar de forma manual si no se basan en compile() por las pérdidas y métricas)
  • Eso es. Esa es la lista.
class CustomModel(keras.Model):
    def train_step(self, data):
        # Unpack the data. Its structure depends on your model and
        # on what you pass to `fit()`.
        if len(data) == 3:
            x, y, sample_weight = data
        else:
            sample_weight = None
            x, y = data

        with tf.GradientTape() as tape:
            y_pred = self(x, training=True)  # Forward pass
            # Compute the loss value.
            # The loss function is configured in `compile()`.
            loss = self.compiled_loss(
                y,
                y_pred,
                sample_weight=sample_weight,
                regularization_losses=self.losses,
            )

        # Compute gradients
        trainable_vars = self.trainable_variables
        gradients = tape.gradient(loss, trainable_vars)

        # Update weights
        self.optimizer.apply_gradients(zip(gradients, trainable_vars))

        # Update the metrics.
        # Metrics are configured in `compile()`.
        self.compiled_metrics.update_state(y, y_pred, sample_weight=sample_weight)

        # Return a dict mapping metric names to current value.
        # Note that it will include the loss (tracked in self.metrics).
        return {m.name: m.result() for m in self.metrics}


# Construct and compile an instance of CustomModel
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)
model.compile(optimizer="adam", loss="mse", metrics=["mae"])

# You can now use sample_weight argument
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
sw = np.random.random((1000, 1))
model.fit(x, y, sample_weight=sw, epochs=3)
Epoch 1/3
32/32 [==============================] - 0s 2ms/step - loss: 0.1365 - mae: 0.4196
Epoch 2/3
32/32 [==============================] - 0s 2ms/step - loss: 0.1285 - mae: 0.4068
Epoch 3/3
32/32 [==============================] - 0s 2ms/step - loss: 0.1212 - mae: 0.3971
<keras.callbacks.History at 0x7f5ad1ba64d0>

Proporcionar su propio paso de evaluación

¿Qué pasa si usted quiere hacer lo mismo para las llamadas a model.evaluate() ? De allí tendría que anular test_step exactamente de la misma manera. Así es como se ve:

class CustomModel(keras.Model):
    def test_step(self, data):
        # Unpack the data
        x, y = data
        # Compute predictions
        y_pred = self(x, training=False)
        # Updates the metrics tracking the loss
        self.compiled_loss(y, y_pred, regularization_losses=self.losses)
        # Update the metrics.
        self.compiled_metrics.update_state(y, y_pred)
        # Return a dict mapping metric names to current value.
        # Note that it will include the loss (tracked in self.metrics).
        return {m.name: m.result() for m in self.metrics}


# Construct an instance of CustomModel
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)
model.compile(loss="mse", metrics=["mae"])

# Evaluate with our custom test_step
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
model.evaluate(x, y)
32/32 [==============================] - 0s 1ms/step - loss: 2.7584 - mae: 1.5920
[2.758362054824829, 1.59201979637146]

Conclusión: un ejemplo de GAN de un extremo a otro

Analicemos un ejemplo de principio a fin que aprovecha todo lo que acaba de aprender.

Consideremos:

  • Una red de generadores destinada a generar imágenes de 28x28x1.
  • Una red discriminadora destinada a clasificar imágenes de 28x28x1 en dos clases ("falsas" y "reales").
  • Un optimizador para cada uno.
  • Una función de pérdida para entrenar al discriminador.
from tensorflow.keras import layers

# Create the discriminator
discriminator = keras.Sequential(
    [
        keras.Input(shape=(28, 28, 1)),
        layers.Conv2D(64, (3, 3), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.Conv2D(128, (3, 3), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.GlobalMaxPooling2D(),
        layers.Dense(1),
    ],
    name="discriminator",
)

# Create the generator
latent_dim = 128
generator = keras.Sequential(
    [
        keras.Input(shape=(latent_dim,)),
        # We want to generate 128 coefficients to reshape into a 7x7x128 map
        layers.Dense(7 * 7 * 128),
        layers.LeakyReLU(alpha=0.2),
        layers.Reshape((7, 7, 128)),
        layers.Conv2DTranspose(128, (4, 4), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.Conv2DTranspose(128, (4, 4), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.Conv2D(1, (7, 7), padding="same", activation="sigmoid"),
    ],
    name="generator",
)

Aquí está una clase GAN-función completa, haciendo caso omiso compile() para usar su propia firma, y la aplicación de todo el algoritmo de GAN en 17 líneas en train_step :

class GAN(keras.Model):
    def __init__(self, discriminator, generator, latent_dim):
        super(GAN, self).__init__()
        self.discriminator = discriminator
        self.generator = generator
        self.latent_dim = latent_dim

    def compile(self, d_optimizer, g_optimizer, loss_fn):
        super(GAN, self).compile()
        self.d_optimizer = d_optimizer
        self.g_optimizer = g_optimizer
        self.loss_fn = loss_fn

    def train_step(self, real_images):
        if isinstance(real_images, tuple):
            real_images = real_images[0]
        # Sample random points in the latent space
        batch_size = tf.shape(real_images)[0]
        random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))

        # Decode them to fake images
        generated_images = self.generator(random_latent_vectors)

        # Combine them with real images
        combined_images = tf.concat([generated_images, real_images], axis=0)

        # Assemble labels discriminating real from fake images
        labels = tf.concat(
            [tf.ones((batch_size, 1)), tf.zeros((batch_size, 1))], axis=0
        )
        # Add random noise to the labels - important trick!
        labels += 0.05 * tf.random.uniform(tf.shape(labels))

        # Train the discriminator
        with tf.GradientTape() as tape:
            predictions = self.discriminator(combined_images)
            d_loss = self.loss_fn(labels, predictions)
        grads = tape.gradient(d_loss, self.discriminator.trainable_weights)
        self.d_optimizer.apply_gradients(
            zip(grads, self.discriminator.trainable_weights)
        )

        # Sample random points in the latent space
        random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))

        # Assemble labels that say "all real images"
        misleading_labels = tf.zeros((batch_size, 1))

        # Train the generator (note that we should *not* update the weights
        # of the discriminator)!
        with tf.GradientTape() as tape:
            predictions = self.discriminator(self.generator(random_latent_vectors))
            g_loss = self.loss_fn(misleading_labels, predictions)
        grads = tape.gradient(g_loss, self.generator.trainable_weights)
        self.g_optimizer.apply_gradients(zip(grads, self.generator.trainable_weights))
        return {"d_loss": d_loss, "g_loss": g_loss}

Probémoslo:

# Prepare the dataset. We use both the training & test MNIST digits.
batch_size = 64
(x_train, _), (x_test, _) = keras.datasets.mnist.load_data()
all_digits = np.concatenate([x_train, x_test])
all_digits = all_digits.astype("float32") / 255.0
all_digits = np.reshape(all_digits, (-1, 28, 28, 1))
dataset = tf.data.Dataset.from_tensor_slices(all_digits)
dataset = dataset.shuffle(buffer_size=1024).batch(batch_size)

gan = GAN(discriminator=discriminator, generator=generator, latent_dim=latent_dim)
gan.compile(
    d_optimizer=keras.optimizers.Adam(learning_rate=0.0003),
    g_optimizer=keras.optimizers.Adam(learning_rate=0.0003),
    loss_fn=keras.losses.BinaryCrossentropy(from_logits=True),
)

# To limit the execution time, we only train on 100 batches. You can train on
# the entire dataset. You will need about 20 epochs to get nice results.
gan.fit(dataset.take(100), epochs=1)
100/100 [==============================] - 3s 11ms/step - d_loss: 0.4031 - g_loss: 0.9305
<keras.callbacks.History at 0x7f5ad1b37c50>

Las ideas detrás del aprendizaje profundo son simples, entonces, ¿por qué debería ser dolorosa su implementación?