이 페이지는 Cloud Translation API를 통해 번역되었습니다.
Switch to English

Model.fit에서 일어나는 일을 사용자 지정

TensorFlow.org에서보기 Google Colab에서 실행 GitHub에서 소스보기 노트북 다운로드

소개

지도 학습을 할 때 fit() 사용할 수 있으며 모든 것이 원활하게 작동합니다.

자신 만의 트레이닝 루프를 처음부터 작성해야 할 때 GradientTape 사용하여 모든 세부 사항을 제어 할 수 있습니다.

하지만 맞춤 학습 알고리즘이 필요하지만 콜백, 기본 제공 배포 지원 또는 단계 융합과 같은 fit() 의 편리한 기능을 계속 활용하려면 어떻게해야할까요?

Keras의 핵심 원칙 은 복잡성을 점진적으로 공개하는 것입니다 . 점진적인 방법으로 항상 하위 수준의 워크 플로에 들어갈 수 있어야합니다. 높은 수준의 기능이 사용 사례와 정확히 일치하지 않는 경우 절벽에서 떨어지지 않아야합니다. 높은 수준의 편의성을 유지하면서 작은 세부 사항을 더 잘 제어 할 수 있어야합니다.

fit() 을 사용자 정의해야하는 경우 Model 클래스의 학습 단계 함수를 재정의 해야합니다. 이것은 모든 데이터 배치에 대해 fit() 에 의해 호출되는 함수입니다. 그러면 평소처럼 fit() 을 호출 할 수 있으며 자체 학습 알고리즘을 실행합니다.

이 패턴은 Functional API를 사용하여 모델을 빌드하는 것을 방해하지 않습니다. Sequential 모델, 기능적 API 모델 또는 하위 클래스 모델을 빌드하든 관계없이이를 수행 할 수 있습니다.

어떻게 작동하는지 봅시다.

설정

TensorFlow 2.2 이상이 필요합니다.

import tensorflow as tf
from tensorflow import keras

첫 번째 간단한 예

간단한 예부터 시작하겠습니다.

  • keras.Model 하위 클래스를 만드는 새 클래스를 keras.Model .
  • 우리는 train_step(self, data) 메서드를 재정의합니다.
  • 딕셔너리 매핑 메트릭 이름 (손실 포함)을 현재 값으로 반환합니다.

입력 인수 data 는 훈련 데이터에 맞게 전달됩니다.

  • fit(x, y, ...) 를 호출하여 Numpy 배열을 전달하면 data 가 튜플 (x, y)
  • fit(dataset, ...) 를 호출하여 tf.data.Dataset 을 전달하면 data 가 각 배치에서 dataset 에 의해 산출됩니다.

train_step 메서드의 본문에서 이미 익숙한 것과 유사한 정기적 인 교육 업데이트를 구현합니다. 중요한 것은 self.compiled_loss 를 통해 손실을 계산하는 self.compiled_loss , 이는 compile() 전달 된 loss (es) 함수를 래핑합니다.

마찬가지로 self.compiled_metrics.update_state(y, y_pred) 를 호출하여 compile() 에서 전달 된 메트릭의 상태를 업데이트하고 끝에 self.metrics 결과를 쿼리하여 현재 값을 검색합니다.

class CustomModel(keras.Model):
    def train_step(self, data):
        # Unpack the data. Its structure depends on your model and
        # on what you pass to `fit()`.
        x, y = data

        with tf.GradientTape() as tape:
            y_pred = self(x, training=True)  # Forward pass
            # Compute the loss value
            # (the loss function is configured in `compile()`)
            loss = self.compiled_loss(y, y_pred, regularization_losses=self.losses)

        # Compute gradients
        trainable_vars = self.trainable_variables
        gradients = tape.gradient(loss, trainable_vars)
        # Update weights
        self.optimizer.apply_gradients(zip(gradients, trainable_vars))
        # Update metrics (includes the metric that tracks the loss)
        self.compiled_metrics.update_state(y, y_pred)
        # Return a dict mapping metric names to current value
        return {m.name: m.result() for m in self.metrics}

이것을 시도해 봅시다 :

import numpy as np

# Construct and compile an instance of CustomModel
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)
model.compile(optimizer="adam", loss="mse", metrics=["mae"])

# Just use `fit` as usual
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
model.fit(x, y, epochs=3)
Epoch 1/3
32/32 [==============================] - 0s 2ms/step - loss: 1.3139 - mae: 1.0328
Epoch 2/3
32/32 [==============================] - 0s 2ms/step - loss: 0.5631 - mae: 0.6313
Epoch 3/3
32/32 [==============================] - 0s 2ms/step - loss: 0.3037 - mae: 0.4481

<tensorflow.python.keras.callbacks.History at 0x7f62580e46d8>

낮은 수준으로 이동

당연히 compile() 에서 손실 함수 전달을 건너 뛰고 대신 train_step 에서 수동으로 모든 작업을 수행 할 수 있습니다 . 메트릭도 마찬가지입니다.

다음은 최적화 프로그램을 구성하기 위해 compile() 만 사용하는 하위 수준의 예입니다.

  • 먼저 손실과 MAE 점수를 추적하기 위해 Metric 인스턴스를 생성합니다.
  • 우리는 사용자 정의 구현 train_step() (호출하여 이러한 메트릭의 상태를 업데이트 update_state() 다음, 그들에)을 (를 통해 그들을 쿼리 result() 현재의 평균 값을 반환), 진행률 표시 줄에 의해 할 수 표시 할 모든 콜백에 전달합니다.
  • reset_states() 사이의 메트릭에 대해 reset_states() 를 호출해야합니다! 그렇지 않으면 result() 호출 result() 훈련 시작 이후 평균이 반환되지만 일반적으로 에포크 당 평균으로 작업합니다. 고맙게도 프레임 워크는이를 수행 할 수 있습니다. 재설정하려는 메트릭을 모델의 metrics 속성에 나열하기 만하면됩니다. 모델은 호출 reset_states() 각의 시작에 여기에 나열된 모든 객체에 fit() 하는 순간 또는 전화의 시작으로 evaluate() .
loss_tracker = keras.metrics.Mean(name="loss")
mae_metric = keras.metrics.MeanAbsoluteError(name="mae")


class CustomModel(keras.Model):
    def train_step(self, data):
        x, y = data

        with tf.GradientTape() as tape:
            y_pred = self(x, training=True)  # Forward pass
            # Compute our own loss
            loss = keras.losses.mean_squared_error(y, y_pred)

        # Compute gradients
        trainable_vars = self.trainable_variables
        gradients = tape.gradient(loss, trainable_vars)

        # Update weights
        self.optimizer.apply_gradients(zip(gradients, trainable_vars))

        # Compute our own metrics
        loss_tracker.update_state(loss)
        mae_metric.update_state(y, y_pred)
        return {"loss": loss_tracker.result(), "mae": mae_metric.result()}

    @property
    def metrics(self):
        # We list our `Metric` objects here so that `reset_states()` can be
        # called automatically at the start of each epoch
        # or at the start of `evaluate()`.
        # If you don't implement this property, you have to call
        # `reset_states()` yourself at the time of your choosing.
        return [loss_tracker, mae_metric]


# Construct an instance of CustomModel
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)

# We don't passs a loss or metrics here.
model.compile(optimizer="adam")

# Just use `fit` as usual -- you can use callbacks, etc.
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
model.fit(x, y, epochs=5)

Epoch 1/5
32/32 [==============================] - 0s 2ms/step - loss: 0.3635 - mae: 0.4892
Epoch 2/5
32/32 [==============================] - 0s 1ms/step - loss: 0.2115 - mae: 0.3722
Epoch 3/5
32/32 [==============================] - 0s 1ms/step - loss: 0.2051 - mae: 0.3649
Epoch 4/5
32/32 [==============================] - 0s 1ms/step - loss: 0.1999 - mae: 0.3605
Epoch 5/5
32/32 [==============================] - 0s 1ms/step - loss: 0.1945 - mae: 0.3556

<tensorflow.python.keras.callbacks.History at 0x7f624c0a66a0>

sample_weightclass_weight 지원

첫 번째 기본 예에서 샘플 가중치에 대해 언급하지 않은 것을 눈치 채 셨을 것입니다. fit() 인수 sample_weightclass_weight 를 지원하려면 다음을 수행하면됩니다.

  • data 인수에서 sample_weight 압축 sample_weight
  • 에 전달 compiled_losscompiled_metrics (당신이에 의존하지 않는 경우 물론, 당신은 또한 단지 수동으로 적용 할 수 compile() 손실 및 측정을위한)
  • 그게 다야. 그게 목록입니다.
class CustomModel(keras.Model):
    def train_step(self, data):
        # Unpack the data. Its structure depends on your model and
        # on what you pass to `fit()`.
        if len(data) == 3:
            x, y, sample_weight = data
        else:
            x, y = data

        with tf.GradientTape() as tape:
            y_pred = self(x, training=True)  # Forward pass
            # Compute the loss value.
            # The loss function is configured in `compile()`.
            loss = self.compiled_loss(
                y,
                y_pred,
                sample_weight=sample_weight,
                regularization_losses=self.losses,
            )

        # Compute gradients
        trainable_vars = self.trainable_variables
        gradients = tape.gradient(loss, trainable_vars)

        # Update weights
        self.optimizer.apply_gradients(zip(gradients, trainable_vars))

        # Update the metrics.
        # Metrics are configured in `compile()`.
        self.compiled_metrics.update_state(y, y_pred, sample_weight=sample_weight)

        # Return a dict mapping metric names to current value.
        # Note that it will include the loss (tracked in self.metrics).
        return {m.name: m.result() for m in self.metrics}


# Construct and compile an instance of CustomModel
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)
model.compile(optimizer="adam", loss="mse", metrics=["mae"])

# You can now use sample_weight argument
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
sw = np.random.random((1000, 1))
model.fit(x, y, sample_weight=sw, epochs=3)
Epoch 1/3
32/32 [==============================] - 0s 2ms/step - loss: 0.4626 - mae: 0.8329
Epoch 2/3
32/32 [==============================] - 0s 2ms/step - loss: 0.2033 - mae: 0.5283
Epoch 3/3
32/32 [==============================] - 0s 2ms/step - loss: 0.1421 - mae: 0.4378

<tensorflow.python.keras.callbacks.History at 0x7f62401c6198>

자신의 평가 단계 제공

model.evaluate() 호출에 대해 동일한 작업을 수행하려면 어떻게해야합니까? 그런 다음 정확히 동일한 방식으로 test_step 을 재정의합니다. 다음과 같이 표시됩니다.

class CustomModel(keras.Model):
    def test_step(self, data):
        # Unpack the data
        x, y = data
        # Compute predictions
        y_pred = self(x, training=False)
        # Updates the metrics tracking the loss
        self.compiled_loss(y, y_pred, regularization_losses=self.losses)
        # Update the metrics.
        self.compiled_metrics.update_state(y, y_pred)
        # Return a dict mapping metric names to current value.
        # Note that it will include the loss (tracked in self.metrics).
        return {m.name: m.result() for m in self.metrics}


# Construct an instance of CustomModel
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)
model.compile(loss="mse", metrics=["mae"])

# Evaluate with our custom test_step
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
model.evaluate(x, y)
32/32 [==============================] - 0s 1ms/step - loss: 0.2375 - mae: 0.3944

[0.2375321239233017, 0.39438238739967346]

마무리 : 엔드-투-엔드 GAN 예제

방금 배운 모든 내용을 활용하는 종단 간 예제를 살펴 보겠습니다.

고려해 봅시다 :

  • 28x28x1 이미지를 생성하기위한 생성기 네트워크.
  • 판별 네트워크는 28x28x1 이미지를 두 가지 클래스 ( "가짜"및 "실제")로 분류하기위한 것입니다.
  • 각각에 대해 하나의 최적화 프로그램.
  • 판별자를 훈련하는 손실 함수입니다.
from tensorflow.keras import layers

# Create the discriminator
discriminator = keras.Sequential(
    [
        keras.Input(shape=(28, 28, 1)),
        layers.Conv2D(64, (3, 3), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.Conv2D(128, (3, 3), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.GlobalMaxPooling2D(),
        layers.Dense(1),
    ],
    name="discriminator",
)

# Create the generator
latent_dim = 128
generator = keras.Sequential(
    [
        keras.Input(shape=(latent_dim,)),
        # We want to generate 128 coefficients to reshape into a 7x7x128 map
        layers.Dense(7 * 7 * 128),
        layers.LeakyReLU(alpha=0.2),
        layers.Reshape((7, 7, 128)),
        layers.Conv2DTranspose(128, (4, 4), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.Conv2DTranspose(128, (4, 4), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.Conv2D(1, (7, 7), padding="same", activation="sigmoid"),
    ],
    name="generator",
)

다음은 기능이 완전한 GAN 클래스로, 자체 서명을 사용하도록 compile() 을 재정의하고 train_step 17 줄에 전체 GAN 알고리즘을 구현합니다.

class GAN(keras.Model):
    def __init__(self, discriminator, generator, latent_dim):
        super(GAN, self).__init__()
        self.discriminator = discriminator
        self.generator = generator
        self.latent_dim = latent_dim

    def compile(self, d_optimizer, g_optimizer, loss_fn):
        super(GAN, self).compile()
        self.d_optimizer = d_optimizer
        self.g_optimizer = g_optimizer
        self.loss_fn = loss_fn

    def train_step(self, real_images):
        if isinstance(real_images, tuple):
            real_images = real_images[0]
        # Sample random points in the latent space
        batch_size = tf.shape(real_images)[0]
        random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))

        # Decode them to fake images
        generated_images = self.generator(random_latent_vectors)

        # Combine them with real images
        combined_images = tf.concat([generated_images, real_images], axis=0)

        # Assemble labels discriminating real from fake images
        labels = tf.concat(
            [tf.ones((batch_size, 1)), tf.zeros((batch_size, 1))], axis=0
        )
        # Add random noise to the labels - important trick!
        labels += 0.05 * tf.random.uniform(tf.shape(labels))

        # Train the discriminator
        with tf.GradientTape() as tape:
            predictions = self.discriminator(combined_images)
            d_loss = self.loss_fn(labels, predictions)
        grads = tape.gradient(d_loss, self.discriminator.trainable_weights)
        self.d_optimizer.apply_gradients(
            zip(grads, self.discriminator.trainable_weights)
        )

        # Sample random points in the latent space
        random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))

        # Assemble labels that say "all real images"
        misleading_labels = tf.zeros((batch_size, 1))

        # Train the generator (note that we should *not* update the weights
        # of the discriminator)!
        with tf.GradientTape() as tape:
            predictions = self.discriminator(self.generator(random_latent_vectors))
            g_loss = self.loss_fn(misleading_labels, predictions)
        grads = tape.gradient(g_loss, self.generator.trainable_weights)
        self.g_optimizer.apply_gradients(zip(grads, self.generator.trainable_weights))
        return {"d_loss": d_loss, "g_loss": g_loss}

시운전 해 보겠습니다.

# Prepare the dataset. We use both the training & test MNIST digits.
batch_size = 64
(x_train, _), (x_test, _) = keras.datasets.mnist.load_data()
all_digits = np.concatenate([x_train, x_test])
all_digits = all_digits.astype("float32") / 255.0
all_digits = np.reshape(all_digits, (-1, 28, 28, 1))
dataset = tf.data.Dataset.from_tensor_slices(all_digits)
dataset = dataset.shuffle(buffer_size=1024).batch(batch_size)

gan = GAN(discriminator=discriminator, generator=generator, latent_dim=latent_dim)
gan.compile(
    d_optimizer=keras.optimizers.Adam(learning_rate=0.0003),
    g_optimizer=keras.optimizers.Adam(learning_rate=0.0003),
    loss_fn=keras.losses.BinaryCrossentropy(from_logits=True),
)

# To limit the execution time, we only train on 100 batches. You can train on
# the entire dataset. You will need about 20 epochs to get nice results.
gan.fit(dataset.take(100), epochs=1)
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
11493376/11490434 [==============================] - 0s 0us/step
100/100 [==============================] - 1s 11ms/step - d_loss: 0.3923 - g_loss: 1.2622

<tensorflow.python.keras.callbacks.History at 0x7f625af136a0>

딥 러닝이면의 아이디어는 간단합니다. 그렇다면 구현이 왜 고통 스러울까요?