Model.fit의 동작 사용자 정의하기

TensorFlow.org에서 보기 Google Colab에서 실행 View source on GitHub 노트북 다운로드

시작하기

감독 학습을 수행할 때 fit()를 사용할 수 있으며 모든 것이 원활하게 작동합니다.

훈련 루프를 처음부터 작성해야 하는 경우, GradientTape를 사용하여 모든 세부 사항을 제어할 수 있습니다.

그러나 사용자 정의 훈련 알고리즘이 필요하지만 콜백, 내장 배포 지원 또는 단계 융합과 같은 fit()의 편리한 특성을 계속 활용하려면 어떻게 해야 할까요?

Keras의 핵심 원칙은 복잡성의 점진적인 공개입니다. 항상 점진적으로 저수준 워크플로부터 시작할 수 있어야 합니다. 높은 수준의 기능이 자신의 사용 사례와 정확하게 일치하지 않다고 해서 절망할 필요는 없습니다. 적절한 수준의 고수준 편의를 유지하면서 작은 세부 사항을 보다 효과적으로 제어할 수 있어야 합니다.

fit()를 사용자 정의해야 하는 경우, Model 클래스의 훈련 단계 함수를 재정의해야 합니다. 이 함수는 모든 데이터 배치에 대해 fit()에 의해 호출되는 함수입니다. 그런 다음 평소와 같이 fit()을 호출 할 수 있으며 자체 학습 알고리즘을 실행합니다.

이 패턴은 Functional API를 사용하여 모델을 빌드하는 데 방해가 되지 않습니다. Sequential 모델, Functional API 모델, 또는 하위 클래스화된 모델과 관계없이 수행할 수 있습니다.

어떻게 동작하는지 살펴보겠습니다.

설정

TensorFlow 2.2 이상이 필요합니다.

import tensorflow as tf
from tensorflow import keras
2022-12-14 22:25:44.982786: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer.so.7'; dlerror: libnvinfer.so.7: cannot open shared object file: No such file or directory
2022-12-14 22:25:44.982892: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer_plugin.so.7'; dlerror: libnvinfer_plugin.so.7: cannot open shared object file: No such file or directory
2022-12-14 22:25:44.982903: W tensorflow/compiler/tf2tensorrt/utils/py_utils.cc:38] TF-TRT Warning: Cannot dlopen some TensorRT libraries. If you would like to use Nvidia GPU with TensorRT, please make sure the missing libraries mentioned above are installed properly.

첫 번째 간단한 예제

간단한 예제부터 시작하겠습니다.

  • keras.Model을 하위 클래스화하는 새 클래스를 만듭니다.
  • train_step(self, data) 메서드를 재정의합니다.
  • 손실을 포함하여 사전 매핑 메트릭 이름을 현재 값으로 반환합니다.

입력 인수 data는 훈련 데이터에 맞게 전달됩니다.

  • fit(x, y, ...)를 호출하여 Numpy 배열을 전달하면 data는 튜플 (x, y)가 됩니다.
  • tf.data.Dataset를 전달하는 경우, fit(dataset, ...)를 호출하여 data가 각 배치에서 dataset에 의해 산출됩니다.

train_step 메서드의 본문에서 이미 익숙한 것과 유사한 정기적인 훈련 업데이트를 구현합니다. 중요한 것은 self.compiled_loss를 통해 손실을 계산하여 compile()로 전달된 손실 함수를 래핑합니다.

마찬가지로, self.compiled_metrics.update_state(y, y_pred)를 호출하여 compile()에 전달된 메트릭의 상태를 업데이트하고, 마지막에 self.metrics의 결과를 쿼리하여 현재 값을 검색합니다.

class CustomModel(keras.Model):
    def train_step(self, data):
        # Unpack the data. Its structure depends on your model and
        # on what you pass to `fit()`.
        x, y = data

        with tf.GradientTape() as tape:
            y_pred = self(x, training=True)  # Forward pass
            # Compute the loss value
            # (the loss function is configured in `compile()`)
            loss = self.compiled_loss(y, y_pred, regularization_losses=self.losses)

        # Compute gradients
        trainable_vars = self.trainable_variables
        gradients = tape.gradient(loss, trainable_vars)
        # Update weights
        self.optimizer.apply_gradients(zip(gradients, trainable_vars))
        # Update metrics (includes the metric that tracks the loss)
        self.compiled_metrics.update_state(y, y_pred)
        # Return a dict mapping metric names to current value
        return {m.name: m.result() for m in self.metrics}

다음을 시도해봅시다.

import numpy as np

# Construct and compile an instance of CustomModel
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)
model.compile(optimizer="adam", loss="mse", metrics=["mae"])

# Just use `fit` as usual
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
model.fit(x, y, epochs=3)
Epoch 1/3
32/32 [==============================] - 1s 2ms/step - loss: 0.6662 - mae: 0.6898
Epoch 2/3
32/32 [==============================] - 0s 2ms/step - loss: 0.2882 - mae: 0.4303
Epoch 3/3
32/32 [==============================] - 0s 2ms/step - loss: 0.2313 - mae: 0.3891
<keras.callbacks.History at 0x7f54ec034e80>

더 낮은 수준으로 구성하기

당연히 compile()에서 손실 함수의 전달을 건너뛰고, 대신 train_step에서 수동으로 모두 수행할 수 있습니다. 메트릭도 마찬가지입니다.

다음은 옵티마이저를 구성하기 위해 compile()만 사용하는 하위 수준의 예입니다.

  • 먼저 손실과 MAE 점수를 추적하기 위해 Metric 인스턴스를 생성합니다.
  • (메트릭에 대한 update_state()를 호출하여) 메트릭의 상태를 업데이트하는 사용자 정의train_step()을 구현한 다음, 쿼리하여(result()를 통해) 현재 평균 값을 반환하여 진행률 표시줄에 표시되고 모든 콜백에 전달되도록 합니다.
  • 각 epoch 사이의 메트릭에 대해 reset_states()를 호출해야 합니다. 그렇지 않으면, result()를 호출하면 훈련 시작 이후부터 평균이 반환되지만, 일반적으로 epoch당 평균을 사용합니다. 다행히도 프레임워크에서는 다음과 같이 수행할 수 있습니다. 즉, 재설정하려는 매트릭을 모델의 metrics 속성에 나열하기만 하면 됩니다. 모델은 각 fit() epoch가 시작될 때 또는 evaluate() 호출이 시작될 때 여기에 나열된 모든 객체에 대해 reset_states()를 호출합니다.
loss_tracker = keras.metrics.Mean(name="loss")
mae_metric = keras.metrics.MeanAbsoluteError(name="mae")


class CustomModel(keras.Model):
    def train_step(self, data):
        x, y = data

        with tf.GradientTape() as tape:
            y_pred = self(x, training=True)  # Forward pass
            # Compute our own loss
            loss = keras.losses.mean_squared_error(y, y_pred)

        # Compute gradients
        trainable_vars = self.trainable_variables
        gradients = tape.gradient(loss, trainable_vars)

        # Update weights
        self.optimizer.apply_gradients(zip(gradients, trainable_vars))

        # Compute our own metrics
        loss_tracker.update_state(loss)
        mae_metric.update_state(y, y_pred)
        return {"loss": loss_tracker.result(), "mae": mae_metric.result()}

    @property
    def metrics(self):
        # We list our `Metric` objects here so that `reset_states()` can be
        # called automatically at the start of each epoch
        # or at the start of `evaluate()`.
        # If you don't implement this property, you have to call
        # `reset_states()` yourself at the time of your choosing.
        return [loss_tracker, mae_metric]


# Construct an instance of CustomModel
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)

# We don't passs a loss or metrics here.
model.compile(optimizer="adam")

# Just use `fit` as usual -- you can use callbacks, etc.
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
model.fit(x, y, epochs=5)
Epoch 1/5
32/32 [==============================] - 0s 2ms/step - loss: 0.2728 - mae: 0.4312
Epoch 2/5
32/32 [==============================] - 0s 2ms/step - loss: 0.2168 - mae: 0.3769
Epoch 3/5
32/32 [==============================] - 0s 2ms/step - loss: 0.2082 - mae: 0.3701
Epoch 4/5
32/32 [==============================] - 0s 2ms/step - loss: 0.2004 - mae: 0.3634
Epoch 5/5
32/32 [==============================] - 0s 2ms/step - loss: 0.1923 - mae: 0.3558
<keras.callbacks.History at 0x7f54e01be760>

sample_weightclass_weight 지원하기

첫 번째 기본 예제에서는 샘플 가중치에 대해 언급하지 않았습니다. fit() 인수 sample_weightclass_weight를 지원하려면 다음을 수행하면 됩니다.

  • data 인수에서 sample_weight 패키지를 풉니다.
  • compiled_losscompiled_metrics에 전달합니다(손실 및 메트릭을 위해 compile()에 의존하지 않는다면 수동으로 적용할 수도 있습니다).
  • 다음은 그 목록입니다.
class CustomModel(keras.Model):
    def train_step(self, data):
        # Unpack the data. Its structure depends on your model and
        # on what you pass to `fit()`.
        if len(data) == 3:
            x, y, sample_weight = data
        else:
            sample_weight = None
            x, y = data

        with tf.GradientTape() as tape:
            y_pred = self(x, training=True)  # Forward pass
            # Compute the loss value.
            # The loss function is configured in `compile()`.
            loss = self.compiled_loss(
                y,
                y_pred,
                sample_weight=sample_weight,
                regularization_losses=self.losses,
            )

        # Compute gradients
        trainable_vars = self.trainable_variables
        gradients = tape.gradient(loss, trainable_vars)

        # Update weights
        self.optimizer.apply_gradients(zip(gradients, trainable_vars))

        # Update the metrics.
        # Metrics are configured in `compile()`.
        self.compiled_metrics.update_state(y, y_pred, sample_weight=sample_weight)

        # Return a dict mapping metric names to current value.
        # Note that it will include the loss (tracked in self.metrics).
        return {m.name: m.result() for m in self.metrics}


# Construct and compile an instance of CustomModel
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)
model.compile(optimizer="adam", loss="mse", metrics=["mae"])

# You can now use sample_weight argument
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
sw = np.random.random((1000, 1))
model.fit(x, y, sample_weight=sw, epochs=3)
Epoch 1/3
32/32 [==============================] - 1s 2ms/step - loss: 0.1389 - mae: 0.4280
Epoch 2/3
32/32 [==============================] - 0s 2ms/step - loss: 0.1296 - mae: 0.4126
Epoch 3/3
32/32 [==============================] - 0s 2ms/step - loss: 0.1229 - mae: 0.4022
<keras.callbacks.History at 0x7f54e0179c70>

자신만의 평가 단계 제공하기

model.evaluate() 호출에 대해 같은 작업을 수행하려면 어떻게 해야 할까요? 정확히 같은 방식으로 test_step을 재정의합니다. 다음과 같습니다.

class CustomModel(keras.Model):
    def test_step(self, data):
        # Unpack the data
        x, y = data
        # Compute predictions
        y_pred = self(x, training=False)
        # Updates the metrics tracking the loss
        self.compiled_loss(y, y_pred, regularization_losses=self.losses)
        # Update the metrics.
        self.compiled_metrics.update_state(y, y_pred)
        # Return a dict mapping metric names to current value.
        # Note that it will include the loss (tracked in self.metrics).
        return {m.name: m.result() for m in self.metrics}


# Construct an instance of CustomModel
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)
model.compile(loss="mse", metrics=["mae"])

# Evaluate with our custom test_step
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
model.evaluate(x, y)
32/32 [==============================] - 0s 2ms/step - loss: 1.1476 - mae: 0.9479
[1.147562026977539, 0.9478825926780701]

마무리: 엔드-투-엔드 GAN 예제

방금 배운 모든 내용을 활용하는 엔드 투 엔드 예제를 살펴보겠습니다.

다음을 고려합니다.

  • 생성기 네트워크는 28x28x1 이미지를 생성합니다.
  • discriminator 네트워크는 28x28x1 이미지를 두 개의 클래스("false" 및 "real")로 분류하기 위한 것입니다.
  • 각각 하나의 옵티마이저를 가집니다.
  • discriminator를 훈련하는 손실 함수입니다.
from tensorflow.keras import layers

# Create the discriminator
discriminator = keras.Sequential(
    [
        keras.Input(shape=(28, 28, 1)),
        layers.Conv2D(64, (3, 3), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.Conv2D(128, (3, 3), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.GlobalMaxPooling2D(),
        layers.Dense(1),
    ],
    name="discriminator",
)

# Create the generator
latent_dim = 128
generator = keras.Sequential(
    [
        keras.Input(shape=(latent_dim,)),
        # We want to generate 128 coefficients to reshape into a 7x7x128 map
        layers.Dense(7 * 7 * 128),
        layers.LeakyReLU(alpha=0.2),
        layers.Reshape((7, 7, 128)),
        layers.Conv2DTranspose(128, (4, 4), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.Conv2DTranspose(128, (4, 4), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.Conv2D(1, (7, 7), padding="same", activation="sigmoid"),
    ],
    name="generator",
)

다음은 자신만의 서명을 사용하기 위해 compile()을 재정의하고 train_step 17줄로 전체 GAN 알고리즘을 구현하는 특성 완료형 GAN 클래스입니다.

class GAN(keras.Model):
    def __init__(self, discriminator, generator, latent_dim):
        super(GAN, self).__init__()
        self.discriminator = discriminator
        self.generator = generator
        self.latent_dim = latent_dim

    def compile(self, d_optimizer, g_optimizer, loss_fn):
        super(GAN, self).compile()
        self.d_optimizer = d_optimizer
        self.g_optimizer = g_optimizer
        self.loss_fn = loss_fn

    def train_step(self, real_images):
        if isinstance(real_images, tuple):
            real_images = real_images[0]
        # Sample random points in the latent space
        batch_size = tf.shape(real_images)[0]
        random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))

        # Decode them to fake images
        generated_images = self.generator(random_latent_vectors)

        # Combine them with real images
        combined_images = tf.concat([generated_images, real_images], axis=0)

        # Assemble labels discriminating real from fake images
        labels = tf.concat(
            [tf.ones((batch_size, 1)), tf.zeros((batch_size, 1))], axis=0
        )
        # Add random noise to the labels - important trick!
        labels += 0.05 * tf.random.uniform(tf.shape(labels))

        # Train the discriminator
        with tf.GradientTape() as tape:
            predictions = self.discriminator(combined_images)
            d_loss = self.loss_fn(labels, predictions)
        grads = tape.gradient(d_loss, self.discriminator.trainable_weights)
        self.d_optimizer.apply_gradients(
            zip(grads, self.discriminator.trainable_weights)
        )

        # Sample random points in the latent space
        random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))

        # Assemble labels that say "all real images"
        misleading_labels = tf.zeros((batch_size, 1))

        # Train the generator (note that we should *not* update the weights
        # of the discriminator)!
        with tf.GradientTape() as tape:
            predictions = self.discriminator(self.generator(random_latent_vectors))
            g_loss = self.loss_fn(misleading_labels, predictions)
        grads = tape.gradient(g_loss, self.generator.trainable_weights)
        self.g_optimizer.apply_gradients(zip(grads, self.generator.trainable_weights))
        return {"d_loss": d_loss, "g_loss": g_loss}

테스트해 봅시다.

# Prepare the dataset. We use both the training & test MNIST digits.
batch_size = 64
(x_train, _), (x_test, _) = keras.datasets.mnist.load_data()
all_digits = np.concatenate([x_train, x_test])
all_digits = all_digits.astype("float32") / 255.0
all_digits = np.reshape(all_digits, (-1, 28, 28, 1))
dataset = tf.data.Dataset.from_tensor_slices(all_digits)
dataset = dataset.shuffle(buffer_size=1024).batch(batch_size)

gan = GAN(discriminator=discriminator, generator=generator, latent_dim=latent_dim)
gan.compile(
    d_optimizer=keras.optimizers.Adam(learning_rate=0.0003),
    g_optimizer=keras.optimizers.Adam(learning_rate=0.0003),
    loss_fn=keras.losses.BinaryCrossentropy(from_logits=True),
)

# To limit the execution time, we only train on 100 batches. You can train on
# the entire dataset. You will need about 20 epochs to get nice results.
gan.fit(dataset.take(100), epochs=1)
100/100 [==============================] - 5s 15ms/step - d_loss: 0.4243 - g_loss: 0.8427
<keras.callbacks.History at 0x7f54cc0da700>

딥 러닝의 기본 개념은 간단합니다. 구현이 고통스러울 이유가 없습니다.