Esta página foi traduzida pela API Cloud Translation.
Switch to English

Personalize o que acontece em Model.fit

Ver no TensorFlow.org Executar no Google Colab Ver fonte no GitHub Baixar caderno

Introdução

Quando estiver fazendo um aprendizado supervisionado, você pode usar fit() e tudo funcionará perfeitamente.

Quando você precisar escrever seu próprio loop de treinamento do zero, poderá usar o GradientTape e assumir o controle de cada pequeno detalhe.

Mas e se você precisar de um algoritmo de treinamento personalizado, mas ainda quiser se beneficiar dos recursos convenientes de fit() , como retornos de chamada, suporte de distribuição integrado ou fusão por etapas?

Um princípio fundamental de Keras é a divulgação progressiva da complexidade . Você sempre deve conseguir entrar em fluxos de trabalho de nível inferior de maneira gradual. Você não deve cair de um penhasco se a funcionalidade de alto nível não corresponder exatamente ao seu caso de uso. Você deve ser capaz de obter mais controle sobre os pequenos detalhes enquanto mantém uma quantidade proporcional de conveniência de alto nível.

Quando você precisa personalizar o que fit() faz, você deve substituir a função da etapa de treinamento da classe Model . Esta é a função chamada por fit() para cada lote de dados. Você poderá então chamar fit() como de costume - e ele executará seu próprio algoritmo de aprendizagem.

Observe que esse padrão não o impede de construir modelos com a API Funcional. Você pode fazer isso se estiver construindo modelos Sequential , modelos de API funcionais ou modelos de subclasse.

Vamos ver como isso funciona.

Configuração

Requer TensorFlow 2.2 ou posterior.

import tensorflow as tf
from tensorflow import keras

Um primeiro exemplo simples

Vamos começar com um exemplo simples:

  • Criamos uma nova classe que subclasses keras.Model .
  • Nós apenas substituímos o método train_step(self, data) .
  • Retornamos nomes de métricas de mapeamento de dicionário (incluindo a perda) para seu valor atual.

O argumento de entrada data é o que é passado para caber como dados de treinamento:

  • Se você passar matrizes Numpy, chamando fit(x, y, ...) , os data serão a tupla (x, y)
  • Se você passar umtf.data.Dataset , chamando fit(dataset, ...) , então os data serão o que é gerado pelo dataset em cada lote.

No corpo do método train_step , implementamos uma atualização de treinamento regular, semelhante ao que você já está familiarizado. É importante self.compiled_loss que calculamos a perda por meio de self.compiled_loss , que envolve a (s) função (ões) de perda que foram passadas para compile() .

Da mesma forma, chamamos self.compiled_metrics.update_state(y, y_pred) para atualizar o estado das métricas que foram passadas em compile() e consultamos os resultados de self.metrics no final para recuperar seu valor atual.

class CustomModel(keras.Model):
    def train_step(self, data):
        # Unpack the data. Its structure depends on your model and
        # on what you pass to `fit()`.
        x, y = data

        with tf.GradientTape() as tape:
            y_pred = self(x, training=True)  # Forward pass
            # Compute the loss value
            # (the loss function is configured in `compile()`)
            loss = self.compiled_loss(y, y_pred, regularization_losses=self.losses)

        # Compute gradients
        trainable_vars = self.trainable_variables
        gradients = tape.gradient(loss, trainable_vars)
        # Update weights
        self.optimizer.apply_gradients(zip(gradients, trainable_vars))
        # Update metrics (includes the metric that tracks the loss)
        self.compiled_metrics.update_state(y, y_pred)
        # Return a dict mapping metric names to current value
        return {m.name: m.result() for m in self.metrics}

Vamos experimentar:

import numpy as np

# Construct and compile an instance of CustomModel
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)
model.compile(optimizer="adam", loss="mse", metrics=["mae"])

# Just use `fit` as usual
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
model.fit(x, y, epochs=3)
Epoch 1/3
32/32 [==============================] - 0s 2ms/step - loss: 1.3139 - mae: 1.0328
Epoch 2/3
32/32 [==============================] - 0s 2ms/step - loss: 0.5631 - mae: 0.6313
Epoch 3/3
32/32 [==============================] - 0s 2ms/step - loss: 0.3037 - mae: 0.4481

<tensorflow.python.keras.callbacks.History at 0x7f62580e46d8>

Indo de nível inferior

Naturalmente, você poderia simplesmente pular a passagem de uma função de perda em compile() e, em vez disso, fazer tudo manualmente em train_step . Da mesma forma para métricas.

Aqui está um exemplo de nível inferior, que usa apenas compile() para configurar o otimizador:

  • Começamos criando instâncias do Metric para rastrear nossa perda e uma pontuação do MAE.
  • Implementamos um train_step() personalizado que atualiza o estado dessas métricas (chamando update_state() nelas) e, em seguida, as consultamos (via result() ) para retornar seu valor médio atual, a ser exibido pela barra de progresso e ser passar para qualquer retorno de chamada.
  • Observe que precisaríamos chamar reset_states() em nossas métricas entre cada época! Caso contrário, chamar result() retornaria uma média desde o início do treinamento, enquanto geralmente trabalhamos com médias por época. Felizmente, a estrutura pode fazer isso por nós: apenas liste qualquer métrica que você deseja redefinir na propriedade metrics do modelo. O modelo chamará reset_states() em qualquer objeto listado aqui no início de cada época fit() ou no início de uma chamada para evaluate() .
loss_tracker = keras.metrics.Mean(name="loss")
mae_metric = keras.metrics.MeanAbsoluteError(name="mae")


class CustomModel(keras.Model):
    def train_step(self, data):
        x, y = data

        with tf.GradientTape() as tape:
            y_pred = self(x, training=True)  # Forward pass
            # Compute our own loss
            loss = keras.losses.mean_squared_error(y, y_pred)

        # Compute gradients
        trainable_vars = self.trainable_variables
        gradients = tape.gradient(loss, trainable_vars)

        # Update weights
        self.optimizer.apply_gradients(zip(gradients, trainable_vars))

        # Compute our own metrics
        loss_tracker.update_state(loss)
        mae_metric.update_state(y, y_pred)
        return {"loss": loss_tracker.result(), "mae": mae_metric.result()}

    @property
    def metrics(self):
        # We list our `Metric` objects here so that `reset_states()` can be
        # called automatically at the start of each epoch
        # or at the start of `evaluate()`.
        # If you don't implement this property, you have to call
        # `reset_states()` yourself at the time of your choosing.
        return [loss_tracker, mae_metric]


# Construct an instance of CustomModel
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)

# We don't passs a loss or metrics here.
model.compile(optimizer="adam")

# Just use `fit` as usual -- you can use callbacks, etc.
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
model.fit(x, y, epochs=5)

Epoch 1/5
32/32 [==============================] - 0s 2ms/step - loss: 0.3635 - mae: 0.4892
Epoch 2/5
32/32 [==============================] - 0s 1ms/step - loss: 0.2115 - mae: 0.3722
Epoch 3/5
32/32 [==============================] - 0s 1ms/step - loss: 0.2051 - mae: 0.3649
Epoch 4/5
32/32 [==============================] - 0s 1ms/step - loss: 0.1999 - mae: 0.3605
Epoch 5/5
32/32 [==============================] - 0s 1ms/step - loss: 0.1945 - mae: 0.3556

<tensorflow.python.keras.callbacks.History at 0x7f624c0a66a0>

Suportando sample_weight e class_weight

Você deve ter notado que nosso primeiro exemplo básico não fez nenhuma menção ao peso da amostra. Se quiser oferecer suporte aos argumentos fit() sample_weight e class_weight , basta fazer o seguinte:

  • Descompacte sample_weight do argumento de data
  • Passe para compiled_loss & compiled_metrics (claro, você também pode aplicá-lo manualmente se você não depende de compile() para perdas e métricas)
  • É isso aí. Essa é a lista.
class CustomModel(keras.Model):
    def train_step(self, data):
        # Unpack the data. Its structure depends on your model and
        # on what you pass to `fit()`.
        if len(data) == 3:
            x, y, sample_weight = data
        else:
            x, y = data

        with tf.GradientTape() as tape:
            y_pred = self(x, training=True)  # Forward pass
            # Compute the loss value.
            # The loss function is configured in `compile()`.
            loss = self.compiled_loss(
                y,
                y_pred,
                sample_weight=sample_weight,
                regularization_losses=self.losses,
            )

        # Compute gradients
        trainable_vars = self.trainable_variables
        gradients = tape.gradient(loss, trainable_vars)

        # Update weights
        self.optimizer.apply_gradients(zip(gradients, trainable_vars))

        # Update the metrics.
        # Metrics are configured in `compile()`.
        self.compiled_metrics.update_state(y, y_pred, sample_weight=sample_weight)

        # Return a dict mapping metric names to current value.
        # Note that it will include the loss (tracked in self.metrics).
        return {m.name: m.result() for m in self.metrics}


# Construct and compile an instance of CustomModel
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)
model.compile(optimizer="adam", loss="mse", metrics=["mae"])

# You can now use sample_weight argument
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
sw = np.random.random((1000, 1))
model.fit(x, y, sample_weight=sw, epochs=3)
Epoch 1/3
32/32 [==============================] - 0s 2ms/step - loss: 0.4626 - mae: 0.8329
Epoch 2/3
32/32 [==============================] - 0s 2ms/step - loss: 0.2033 - mae: 0.5283
Epoch 3/3
32/32 [==============================] - 0s 2ms/step - loss: 0.1421 - mae: 0.4378

<tensorflow.python.keras.callbacks.History at 0x7f62401c6198>

Fornecendo sua própria etapa de avaliação

E se você quiser fazer o mesmo para chamadas para model.evaluate() ? Então você sobrescreveria test_step exatamente da mesma maneira. Esta é a aparência:

class CustomModel(keras.Model):
    def test_step(self, data):
        # Unpack the data
        x, y = data
        # Compute predictions
        y_pred = self(x, training=False)
        # Updates the metrics tracking the loss
        self.compiled_loss(y, y_pred, regularization_losses=self.losses)
        # Update the metrics.
        self.compiled_metrics.update_state(y, y_pred)
        # Return a dict mapping metric names to current value.
        # Note that it will include the loss (tracked in self.metrics).
        return {m.name: m.result() for m in self.metrics}


# Construct an instance of CustomModel
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)
model.compile(loss="mse", metrics=["mae"])

# Evaluate with our custom test_step
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
model.evaluate(x, y)
32/32 [==============================] - 0s 1ms/step - loss: 0.2375 - mae: 0.3944

[0.2375321239233017, 0.39438238739967346]

Resumindo: um exemplo de GAN de ponta a ponta

Vamos examinar um exemplo completo que aproveita tudo o que você acabou de aprender.

Vamos considerar:

  • Uma rede geradora destinada a gerar imagens 28x28x1.
  • Uma rede discriminadora destinada a classificar imagens 28x28x1 em duas classes ("falsas" e "reais").
  • Um otimizador para cada.
  • Uma função de perda para treinar o discriminador.
from tensorflow.keras import layers

# Create the discriminator
discriminator = keras.Sequential(
    [
        keras.Input(shape=(28, 28, 1)),
        layers.Conv2D(64, (3, 3), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.Conv2D(128, (3, 3), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.GlobalMaxPooling2D(),
        layers.Dense(1),
    ],
    name="discriminator",
)

# Create the generator
latent_dim = 128
generator = keras.Sequential(
    [
        keras.Input(shape=(latent_dim,)),
        # We want to generate 128 coefficients to reshape into a 7x7x128 map
        layers.Dense(7 * 7 * 128),
        layers.LeakyReLU(alpha=0.2),
        layers.Reshape((7, 7, 128)),
        layers.Conv2DTranspose(128, (4, 4), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.Conv2DTranspose(128, (4, 4), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.Conv2D(1, (7, 7), padding="same", activation="sigmoid"),
    ],
    name="generator",
)

Aqui está uma classe GAN completa de recursos, substituindo compile() para usar sua própria assinatura e implementando todo o algoritmo GAN em 17 linhas em train_step :

class GAN(keras.Model):
    def __init__(self, discriminator, generator, latent_dim):
        super(GAN, self).__init__()
        self.discriminator = discriminator
        self.generator = generator
        self.latent_dim = latent_dim

    def compile(self, d_optimizer, g_optimizer, loss_fn):
        super(GAN, self).compile()
        self.d_optimizer = d_optimizer
        self.g_optimizer = g_optimizer
        self.loss_fn = loss_fn

    def train_step(self, real_images):
        if isinstance(real_images, tuple):
            real_images = real_images[0]
        # Sample random points in the latent space
        batch_size = tf.shape(real_images)[0]
        random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))

        # Decode them to fake images
        generated_images = self.generator(random_latent_vectors)

        # Combine them with real images
        combined_images = tf.concat([generated_images, real_images], axis=0)

        # Assemble labels discriminating real from fake images
        labels = tf.concat(
            [tf.ones((batch_size, 1)), tf.zeros((batch_size, 1))], axis=0
        )
        # Add random noise to the labels - important trick!
        labels += 0.05 * tf.random.uniform(tf.shape(labels))

        # Train the discriminator
        with tf.GradientTape() as tape:
            predictions = self.discriminator(combined_images)
            d_loss = self.loss_fn(labels, predictions)
        grads = tape.gradient(d_loss, self.discriminator.trainable_weights)
        self.d_optimizer.apply_gradients(
            zip(grads, self.discriminator.trainable_weights)
        )

        # Sample random points in the latent space
        random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))

        # Assemble labels that say "all real images"
        misleading_labels = tf.zeros((batch_size, 1))

        # Train the generator (note that we should *not* update the weights
        # of the discriminator)!
        with tf.GradientTape() as tape:
            predictions = self.discriminator(self.generator(random_latent_vectors))
            g_loss = self.loss_fn(misleading_labels, predictions)
        grads = tape.gradient(g_loss, self.generator.trainable_weights)
        self.g_optimizer.apply_gradients(zip(grads, self.generator.trainable_weights))
        return {"d_loss": d_loss, "g_loss": g_loss}

Vamos testá-lo:

# Prepare the dataset. We use both the training & test MNIST digits.
batch_size = 64
(x_train, _), (x_test, _) = keras.datasets.mnist.load_data()
all_digits = np.concatenate([x_train, x_test])
all_digits = all_digits.astype("float32") / 255.0
all_digits = np.reshape(all_digits, (-1, 28, 28, 1))
dataset = tf.data.Dataset.from_tensor_slices(all_digits)
dataset = dataset.shuffle(buffer_size=1024).batch(batch_size)

gan = GAN(discriminator=discriminator, generator=generator, latent_dim=latent_dim)
gan.compile(
    d_optimizer=keras.optimizers.Adam(learning_rate=0.0003),
    g_optimizer=keras.optimizers.Adam(learning_rate=0.0003),
    loss_fn=keras.losses.BinaryCrossentropy(from_logits=True),
)

# To limit the execution time, we only train on 100 batches. You can train on
# the entire dataset. You will need about 20 epochs to get nice results.
gan.fit(dataset.take(100), epochs=1)
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
11493376/11490434 [==============================] - 0s 0us/step
100/100 [==============================] - 1s 11ms/step - d_loss: 0.3923 - g_loss: 1.2622

<tensorflow.python.keras.callbacks.History at 0x7f625af136a0>

As ideias por trás do aprendizado profundo são simples, então por que sua implementação deveria ser dolorosa?