이 페이지는 Cloud Translation API를 통해 번역되었습니다.
Switch to English

처음부터 훈련 루프 작성

TensorFlow.org에서보기 Google Colab에서 실행 GitHub에서 소스보기 노트북 다운로드

설정

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import numpy as np

소개

Keras는 기본 훈련 및 평가 루프 인 fit()evaluate() 합니다. 사용법은 기본 제공 방법을 사용한 교육 및 평가 가이드에서 다룹니다.

fit() 의 편리함을 활용하면서 모델의 학습 알고리즘을 사용자 정의하려면 (예를 들어 fit() 사용하여 GAN을 훈련 시키려면) Model 클래스를 하위 클래스 화하고 고유 한 train_step() 메서드를 구현할 수 있습니다. fit() 동안 반복적으로 호출됩니다. 이것은 fit() 에서 일어나는 일 사용자 정의 가이드에서 다룹니다.

이제 훈련 및 평가에 대한 매우 낮은 수준의 제어를 원한다면 처음부터 자신의 훈련 및 평가 루프를 작성해야합니다. 이것이이 가이드의 내용입니다.

GradientTape 사용 : 첫 번째 종단 간 예제

GradientTape 범위 내에서 모델을 호출하면 손실 값과 관련하여 학습 가능한 레이어 가중치의 기울기를 검색 할 수 있습니다. 옵티 마이저 인스턴스를 사용하면 이러한 기울기를 사용하여 이러한 변수를 업데이트 할 수 있습니다 ( model.trainable_weights 사용하여 검색 할 수 model.trainable_weights ).

간단한 MNIST 모델을 고려해 보겠습니다.

inputs = keras.Input(shape=(784,), name="digits")
x1 = layers.Dense(64, activation="relu")(inputs)
x2 = layers.Dense(64, activation="relu")(x1)
outputs = layers.Dense(10, name="predictions")(x2)
model = keras.Model(inputs=inputs, outputs=outputs)

맞춤 학습 루프가있는 미니 배치 그래디언트를 사용하여 학습 해 보겠습니다.

먼저 옵티 마이저, 손실 함수 및 데이터 세트가 필요합니다.

# Instantiate an optimizer.
optimizer = keras.optimizers.SGD(learning_rate=1e-3)
# Instantiate a loss function.
loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)

# Prepare the training dataset.
batch_size = 64
(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()
x_train = np.reshape(x_train, (-1, 784))
x_test = np.reshape(x_test, (-1, 784))
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = train_dataset.shuffle(buffer_size=1024).batch(batch_size)

다음은 교육 루프입니다.

  • 에포크를 반복하는 for 루프를 엽니 다.
  • 각 Epoch에 대해 데이터 세트를 일괄 적으로 반복하는 for 루프를 엽니 다.
  • 각 배치에 대해 GradientTape() 범위를 엽니 다.
  • 이 범위 내에서 모델 (순방향 전달)을 호출하고 손실을 계산합니다.
  • 범위 밖에서 손실과 관련하여 모델 가중치의 기울기를 검색합니다.
  • 마지막으로 최적화기를 사용하여 기울기를 기반으로 모델의 가중치를 업데이트합니다.
epochs = 2
for epoch in range(epochs):
    print("\nStart of epoch %d" % (epoch,))

    # Iterate over the batches of the dataset.
    for step, (x_batch_train, y_batch_train) in enumerate(train_dataset):

        # Open a GradientTape to record the operations run
        # during the forward pass, which enables auto-differentiation.
        with tf.GradientTape() as tape:

            # Run the forward pass of the layer.
            # The operations that the layer applies
            # to its inputs are going to be recorded
            # on the GradientTape.
            logits = model(x_batch_train, training=True)  # Logits for this minibatch

            # Compute the loss value for this minibatch.
            loss_value = loss_fn(y_batch_train, logits)

        # Use the gradient tape to automatically retrieve
        # the gradients of the trainable variables with respect to the loss.
        grads = tape.gradient(loss_value, model.trainable_weights)

        # Run one step of gradient descent by updating
        # the value of the variables to minimize the loss.
        optimizer.apply_gradients(zip(grads, model.trainable_weights))

        # Log every 200 batches.
        if step % 200 == 0:
            print(
                "Training loss (for one batch) at step %d: %.4f"
                % (step, float(loss_value))
            )
            print("Seen so far: %s samples" % ((step + 1) * 64))

Start of epoch 0
Training loss (for one batch) at step 0: 118.1961
Seen so far: 64 samples
Training loss (for one batch) at step 200: 2.1943
Seen so far: 12864 samples
Training loss (for one batch) at step 400: 0.9606
Seen so far: 25664 samples
Training loss (for one batch) at step 600: 0.9748
Seen so far: 38464 samples
Training loss (for one batch) at step 800: 0.3943
Seen so far: 51264 samples

Start of epoch 1
Training loss (for one batch) at step 0: 0.4881
Seen so far: 64 samples
Training loss (for one batch) at step 200: 0.5267
Seen so far: 12864 samples
Training loss (for one batch) at step 400: 0.6100
Seen so far: 25664 samples
Training loss (for one batch) at step 600: 0.3300
Seen so far: 38464 samples
Training loss (for one batch) at step 800: 0.3622
Seen so far: 51264 samples

낮은 수준의 메트릭 처리

이 기본 루프에 메트릭 모니터링을 추가해 보겠습니다.

처음부터 작성된 교육 루프에서 기본 제공 메트릭 (또는 사용자가 작성한 사용자 지정 메트릭)을 쉽게 재사용 할 수 있습니다. 흐름은 다음과 같습니다.

  • 루프 시작시 메트릭 인스턴스화
  • 각 배치 후 metric.update_state() 호출
  • 메트릭의 현재 값을 표시해야하는 경우 metric.result() 호출하십시오.
  • 메트릭의 상태를 지워야 할 때 (일반적으로 epoch의 끝에서 metric.reset_states() 호출 metric.reset_states()

이 지식을 사용하여 각 시대가 끝날 때 유효성 검사 데이터에 대한 SparseCategoricalAccuracy 를 계산해 SparseCategoricalAccuracy .

# Get model
inputs = keras.Input(shape=(784,), name="digits")
x = layers.Dense(64, activation="relu", name="dense_1")(inputs)
x = layers.Dense(64, activation="relu", name="dense_2")(x)
outputs = layers.Dense(10, name="predictions")(x)
model = keras.Model(inputs=inputs, outputs=outputs)

# Instantiate an optimizer to train the model.
optimizer = keras.optimizers.SGD(learning_rate=1e-3)
# Instantiate a loss function.
loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)

# Prepare the metrics.
train_acc_metric = keras.metrics.SparseCategoricalAccuracy()
val_acc_metric = keras.metrics.SparseCategoricalAccuracy()

# Prepare the training dataset.
batch_size = 64
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = train_dataset.shuffle(buffer_size=1024).batch(batch_size)

# Prepare the validation dataset.
# Reserve 10,000 samples for validation.
x_val = x_train[-10000:]
y_val = y_train[-10000:]
x_train = x_train[:-10000]
y_train = y_train[:-10000]
val_dataset = tf.data.Dataset.from_tensor_slices((x_val, y_val))
val_dataset = val_dataset.batch(64)

다음은 교육 및 평가 루프입니다.

import time

epochs = 2
for epoch in range(epochs):
    print("\nStart of epoch %d" % (epoch,))
    start_time = time.time()

    # Iterate over the batches of the dataset.
    for step, (x_batch_train, y_batch_train) in enumerate(train_dataset):
        with tf.GradientTape() as tape:
            logits = model(x_batch_train, training=True)
            loss_value = loss_fn(y_batch_train, logits)
        grads = tape.gradient(loss_value, model.trainable_weights)
        optimizer.apply_gradients(zip(grads, model.trainable_weights))

        # Update training metric.
        train_acc_metric.update_state(y_batch_train, logits)

        # Log every 200 batches.
        if step % 200 == 0:
            print(
                "Training loss (for one batch) at step %d: %.4f"
                % (step, float(loss_value))
            )
            print("Seen so far: %d samples" % ((step + 1) * 64))

    # Display metrics at the end of each epoch.
    train_acc = train_acc_metric.result()
    print("Training acc over epoch: %.4f" % (float(train_acc),))

    # Reset training metrics at the end of each epoch
    train_acc_metric.reset_states()

    # Run a validation loop at the end of each epoch.
    for x_batch_val, y_batch_val in val_dataset:
        val_logits = model(x_batch_val, training=False)
        # Update val metrics
        val_acc_metric.update_state(y_batch_val, val_logits)
    val_acc = val_acc_metric.result()
    val_acc_metric.reset_states()
    print("Validation acc: %.4f" % (float(val_acc),))
    print("Time taken: %.2fs" % (time.time() - start_time))

Start of epoch 0
Training loss (for one batch) at step 0: 88.3728
Seen so far: 64 samples
Training loss (for one batch) at step 200: 1.3446
Seen so far: 12864 samples
Training loss (for one batch) at step 400: 0.7409
Seen so far: 25664 samples
Training loss (for one batch) at step 600: 0.4098
Seen so far: 38464 samples
Training loss (for one batch) at step 800: 0.7732
Seen so far: 51264 samples
Training acc over epoch: 0.7515
Validation acc: 0.8552
Time taken: 5.60s

Start of epoch 1
Training loss (for one batch) at step 0: 0.6751
Seen so far: 64 samples
Training loss (for one batch) at step 200: 0.5519
Seen so far: 12864 samples
Training loss (for one batch) at step 400: 0.6730
Seen so far: 25664 samples
Training loss (for one batch) at step 600: 0.2807
Seen so far: 38464 samples
Training loss (for one batch) at step 800: 0.6229
Seen so far: 51264 samples
Training acc over epoch: 0.8577
Validation acc: 0.8908
Time taken: 5.62s

tf.function 훈련 단계 tf.function

TensorFlow 2.0의 기본 런타임은 즉시 실행 입니다. 따라서 위의 교육 루프는 열심히 실행됩니다.

이것은 디버깅에 적합하지만 그래프 컴파일에는 확실한 성능 이점이 있습니다. 계산을 정적 그래프로 설명하면 프레임 워크가 전역 성능 최적화를 적용 할 수 있습니다. 프레임 워크가 다음에 무슨 일이 일어날 지 알지 못한 채 탐욕스럽게 작업을 하나씩 실행하도록 제한되어있는 경우 불가능합니다.

텐서를 입력으로 사용하는 모든 함수를 정적 그래프로 컴파일 할 수 있습니다. 다음과 같이 @tf.function 데코레이터를 추가하면됩니다.

@tf.function
def train_step(x, y):
    with tf.GradientTape() as tape:
        logits = model(x, training=True)
        loss_value = loss_fn(y, logits)
    grads = tape.gradient(loss_value, model.trainable_weights)
    optimizer.apply_gradients(zip(grads, model.trainable_weights))
    train_acc_metric.update_state(y, logits)
    return loss_value

평가 단계에서도 똑같이합시다.

@tf.function
def test_step(x, y):
    val_logits = model(x, training=False)
    val_acc_metric.update_state(y, val_logits)

이제이 컴파일 된 학습 단계를 사용하여 학습 루프를 다시 실행 해 보겠습니다.

import time

epochs = 2
for epoch in range(epochs):
    print("\nStart of epoch %d" % (epoch,))
    start_time = time.time()

    # Iterate over the batches of the dataset.
    for step, (x_batch_train, y_batch_train) in enumerate(train_dataset):
        loss_value = train_step(x_batch_train, y_batch_train)

        # Log every 200 batches.
        if step % 200 == 0:
            print(
                "Training loss (for one batch) at step %d: %.4f"
                % (step, float(loss_value))
            )
            print("Seen so far: %d samples" % ((step + 1) * 64))

    # Display metrics at the end of each epoch.
    train_acc = train_acc_metric.result()
    print("Training acc over epoch: %.4f" % (float(train_acc),))

    # Reset training metrics at the end of each epoch
    train_acc_metric.reset_states()

    # Run a validation loop at the end of each epoch.
    for x_batch_val, y_batch_val in val_dataset:
        test_step(x_batch_val, y_batch_val)

    val_acc = val_acc_metric.result()
    val_acc_metric.reset_states()
    print("Validation acc: %.4f" % (float(val_acc),))
    print("Time taken: %.2fs" % (time.time() - start_time))

Start of epoch 0
Training loss (for one batch) at step 0: 0.7984
Seen so far: 64 samples
Training loss (for one batch) at step 200: 0.6274
Seen so far: 12864 samples
Training loss (for one batch) at step 400: 0.2540
Seen so far: 25664 samples
Training loss (for one batch) at step 600: 0.5385
Seen so far: 38464 samples
Training loss (for one batch) at step 800: 0.2974
Seen so far: 51264 samples
Training acc over epoch: 0.8866
Validation acc: 0.9082
Time taken: 1.35s

Start of epoch 1
Training loss (for one batch) at step 0: 0.5614
Seen so far: 64 samples
Training loss (for one batch) at step 200: 0.7772
Seen so far: 12864 samples
Training loss (for one batch) at step 400: 0.5483
Seen so far: 25664 samples
Training loss (for one batch) at step 600: 0.6677
Seen so far: 38464 samples
Training loss (for one batch) at step 800: 0.5454
Seen so far: 51264 samples
Training acc over epoch: 0.9011
Validation acc: 0.9202
Time taken: 1.10s

훨씬 빠르죠?

모델에 의해 추적 된 손실의 저수준 처리

레이어 및 모델은 self.add_loss(value) 를 호출하는 레이어에 의해 순방향 전달 중에 생성 된 손실을 재귀 적으로 추적합니다. 스칼라 손실 값의 결과 목록은 순방향 패스가 끝날 때 model.losses 속성을 통해 사용할 수 있습니다.

이러한 손실 구성 요소를 사용하려면이를 합산하고 훈련 단계의 주요 손실에 추가해야합니다.

활동 정규화 손실을 생성하는 다음 계층을 고려하십시오.

class ActivityRegularizationLayer(layers.Layer):
    def call(self, inputs):
        self.add_loss(1e-2 * tf.reduce_sum(inputs))
        return inputs

그것을 사용하는 정말 간단한 모델을 만들어 봅시다 :

inputs = keras.Input(shape=(784,), name="digits")
x = layers.Dense(64, activation="relu")(inputs)
# Insert activity regularization as a layer
x = ActivityRegularizationLayer()(x)
x = layers.Dense(64, activation="relu")(x)
outputs = layers.Dense(10, name="predictions")(x)

model = keras.Model(inputs=inputs, outputs=outputs)

이제 훈련 단계는 다음과 같습니다.

@tf.function
def train_step(x, y):
    with tf.GradientTape() as tape:
        logits = model(x, training=True)
        loss_value = loss_fn(y, logits)
        # Add any extra losses created during the forward pass.
        loss_value += sum(model.losses)
    grads = tape.gradient(loss_value, model.trainable_weights)
    optimizer.apply_gradients(zip(grads, model.trainable_weights))
    train_acc_metric.update_state(y, logits)
    return loss_value

요약

이제 기본 제공 교육 루프를 사용하고 처음부터 직접 작성하는 방법에 대해 알아야 할 모든 것을 알았습니다.

결론을 내리기 위해이 가이드에서 배운 모든 내용을 연결하는 간단한 종단 간 예제가 있습니다. MNIST 숫자로 훈련 된 DCGAN입니다.

엔드-투-엔드 예제 : 처음부터 GAN 훈련 루프

GAN (Generative Adversarial Network)에 익숙 할 수 있습니다. GAN은 이미지 훈련 데이터 세트 (이미지의 "잠재 공간")의 잠재 분포를 학습하여 거의 실제처럼 보이는 새로운 이미지를 생성 할 수 있습니다.

GAN은 잠복 공간의 점을 이미지 공간의 점에 매핑하는 "생성자"모델, "분별 자"모델, 실제 이미지 (학습 데이터 세트의)와 가짜의 차이를 구분할 수있는 분류기의 두 부분으로 구성됩니다. 이미지 (생성기 네트워크의 출력).

GAN 교육 루프는 다음과 같습니다.

1) 판별자를 훈련하십시오. -잠재 공간에서 임의의 지점을 샘플링합니다. - "생성기"모델을 통해 포인트를 가짜 이미지로 전환합니다. -실제 이미지의 배치를 가져 와서 생성 된 이미지와 결합합니다. -생성 된 이미지와 실제 이미지를 분류하기 위해 "분별 자"모델을 훈련시킵니다.

2) 발전기를 훈련 시키십시오. -잠재 공간에서 무작위 포인트를 샘플링합니다. - "생성기"네트워크를 통해 포인트를 가짜 이미지로 전환합니다. -실제 이미지의 배치를 가져 와서 생성 된 이미지와 결합합니다. - "generator"모델을 훈련시켜 판별자를 "거짓"하고 가짜 이미지를 실제 이미지로 분류합니다.

GAN의 작동 방식에 대한 훨씬 더 자세한 개요는 Python을 사용한 Deep Learning 항목을 참조하십시오.

이 훈련 루프를 구현해 봅시다. 먼저 가짜와 실수를 구분하기위한 판별자를 만듭니다.

discriminator = keras.Sequential(
    [
        keras.Input(shape=(28, 28, 1)),
        layers.Conv2D(64, (3, 3), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.Conv2D(128, (3, 3), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.GlobalMaxPooling2D(),
        layers.Dense(1),
    ],
    name="discriminator",
)
discriminator.summary()
Model: "discriminator"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
=================================================================
conv2d (Conv2D)              (None, 14, 14, 64)        640       
_________________________________________________________________
leaky_re_lu (LeakyReLU)      (None, 14, 14, 64)        0         
_________________________________________________________________
conv2d_1 (Conv2D)            (None, 7, 7, 128)         73856     
_________________________________________________________________
leaky_re_lu_1 (LeakyReLU)    (None, 7, 7, 128)         0         
_________________________________________________________________
global_max_pooling2d (Global (None, 128)               0         
_________________________________________________________________
dense_4 (Dense)              (None, 1)                 129       
=================================================================
Total params: 74,625
Trainable params: 74,625
Non-trainable params: 0
_________________________________________________________________

그런 다음 잠재 벡터를 형태 (28, 28, 1) (MNIST 숫자를 나타냄 (28, 28, 1) 출력으로 변환하는 생성기 네트워크를 만들어 보겠습니다.

latent_dim = 128

generator = keras.Sequential(
    [
        keras.Input(shape=(latent_dim,)),
        # We want to generate 128 coefficients to reshape into a 7x7x128 map
        layers.Dense(7 * 7 * 128),
        layers.LeakyReLU(alpha=0.2),
        layers.Reshape((7, 7, 128)),
        layers.Conv2DTranspose(128, (4, 4), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.Conv2DTranspose(128, (4, 4), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.Conv2D(1, (7, 7), padding="same", activation="sigmoid"),
    ],
    name="generator",
)

핵심 부분은 훈련 루프입니다. 보시다시피 매우 간단합니다. 훈련 단계 기능은 17 줄만 사용합니다.

# Instantiate one optimizer for the discriminator and another for the generator.
d_optimizer = keras.optimizers.Adam(learning_rate=0.0003)
g_optimizer = keras.optimizers.Adam(learning_rate=0.0004)

# Instantiate a loss function.
loss_fn = keras.losses.BinaryCrossentropy(from_logits=True)


@tf.function
def train_step(real_images):
    # Sample random points in the latent space
    random_latent_vectors = tf.random.normal(shape=(batch_size, latent_dim))
    # Decode them to fake images
    generated_images = generator(random_latent_vectors)
    # Combine them with real images
    combined_images = tf.concat([generated_images, real_images], axis=0)

    # Assemble labels discriminating real from fake images
    labels = tf.concat(
        [tf.ones((batch_size, 1)), tf.zeros((real_images.shape[0], 1))], axis=0
    )
    # Add random noise to the labels - important trick!
    labels += 0.05 * tf.random.uniform(labels.shape)

    # Train the discriminator
    with tf.GradientTape() as tape:
        predictions = discriminator(combined_images)
        d_loss = loss_fn(labels, predictions)
    grads = tape.gradient(d_loss, discriminator.trainable_weights)
    d_optimizer.apply_gradients(zip(grads, discriminator.trainable_weights))

    # Sample random points in the latent space
    random_latent_vectors = tf.random.normal(shape=(batch_size, latent_dim))
    # Assemble labels that say "all real images"
    misleading_labels = tf.zeros((batch_size, 1))

    # Train the generator (note that we should *not* update the weights
    # of the discriminator)!
    with tf.GradientTape() as tape:
        predictions = discriminator(generator(random_latent_vectors))
        g_loss = loss_fn(misleading_labels, predictions)
    grads = tape.gradient(g_loss, generator.trainable_weights)
    g_optimizer.apply_gradients(zip(grads, generator.trainable_weights))
    return d_loss, g_loss, generated_images

이미지 배치에 대해 train_step 을 반복적으로 호출하여 GAN을 훈련시켜 보겠습니다.

우리의 판별 자와 생성기는 convnet이므로 GPU에서이 코드를 실행하고 싶을 것입니다.

import os

# Prepare the dataset. We use both the training & test MNIST digits.
batch_size = 64
(x_train, _), (x_test, _) = keras.datasets.mnist.load_data()
all_digits = np.concatenate([x_train, x_test])
all_digits = all_digits.astype("float32") / 255.0
all_digits = np.reshape(all_digits, (-1, 28, 28, 1))
dataset = tf.data.Dataset.from_tensor_slices(all_digits)
dataset = dataset.shuffle(buffer_size=1024).batch(batch_size)

epochs = 1  # In practice you need at least 20 epochs to generate nice digits.
save_dir = "./"

for epoch in range(epochs):
    print("\nStart epoch", epoch)

    for step, real_images in enumerate(dataset):
        # Train the discriminator & generator on one batch of real images.
        d_loss, g_loss, generated_images = train_step(real_images)

        # Logging.
        if step % 200 == 0:
            # Print metrics
            print("discriminator loss at step %d: %.2f" % (step, d_loss))
            print("adversarial loss at step %d: %.2f" % (step, g_loss))

            # Save one generated image
            img = tf.keras.preprocessing.image.array_to_img(
                generated_images[0] * 255.0, scale=False
            )
            img.save(os.path.join(save_dir, "generated_img" + str(step) + ".png"))

        # To limit execution time we stop after 10 steps.
        # Remove the lines below to actually train the model!
        if step > 10:
            break

Start epoch 0
discriminator loss at step 0: 0.68
adversarial loss at step 0: 0.69

그게 다야! Colab GPU에서 30 초 정도 훈련 한 후에 멋진 가짜 MNIST 숫자를 얻을 수 있습니다.