12 月 7 日の Women in ML シンポジウムに参加する今すぐ登録する

トレーニングループの新規作成

コレクションでコンテンツを整理 必要に応じて、コンテンツの保存と分類を行います。

TensorFlow.org で実行 Google Colabで実行 GitHubでソースを表示 ノートブックをダウンロード

セットアップ

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import numpy as np
2022-08-09 05:09:49.850047: E tensorflow/stream_executor/cuda/cuda_blas.cc:2981] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2022-08-09 05:09:50.522468: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer.so.7'; dlerror: libnvrtc.so.11.1: cannot open shared object file: No such file or directory
2022-08-09 05:09:50.522708: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer_plugin.so.7'; dlerror: libnvrtc.so.11.1: cannot open shared object file: No such file or directory
2022-08-09 05:09:50.522720: W tensorflow/compiler/tf2tensorrt/utils/py_utils.cc:38] TF-TRT Warning: Cannot dlopen some TensorRT libraries. If you would like to use Nvidia GPU with TensorRT, please make sure the missing libraries mentioned above are installed properly.

はじめに

Keras には、トレーニングと評価ループに使用できる fit()evaluate() がデフォルトで備わっています。これらの使い方については、「ビルトインメソッドによるトレーニングと評価」を参照してください。

fit() の利便性を活用しながらモデルの学習アルゴリズムをカスタマイズするには(fit() を使用して GAN をトレーニングする場合など)、Model クラスをサブクラス化し、fit() 中に繰り返し呼び出される独自の train_step() メソッドを実装します。詳細については、「fit() で行われる処理のカスタマイズ」を参照してください。

トレーニングと評価を非常に低レベルで制御する場合は、独自のトレーニングと評価ループを新規作成する必要があります。このガイドでは、トレーニングと評価ループを新規作成する方法を見ていきます。

GradientTape の使用: 最初のエンドツーエンドの例

GradientTape スコープ内でモデルを呼び出すと、損失値に対するレイヤーのトレーニング可能な重みの勾配を取得できます。オプティマイザのインスタンスを使用すると、これらの勾配を使用してこれらの変数(model.trainable_weights を使用して取得)を更新できます。

では、シンプルな MNIST モデルを見てみましょう。

inputs = keras.Input(shape=(784,), name="digits")
x1 = layers.Dense(64, activation="relu")(inputs)
x2 = layers.Dense(64, activation="relu")(x1)
outputs = layers.Dense(10, name="predictions")(x2)
model = keras.Model(inputs=inputs, outputs=outputs)

ミニバッチの勾配を使用してカスタムトレーニングループでトレーニングします。

まず、オプティマイザ、損失関数、およびデータセットが必要です。

# Instantiate an optimizer.
optimizer = keras.optimizers.SGD(learning_rate=1e-3)
# Instantiate a loss function.
loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)

# Prepare the training dataset.
batch_size = 64
(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()
x_train = np.reshape(x_train, (-1, 784))
x_test = np.reshape(x_test, (-1, 784))

# Reserve 10,000 samples for validation.
x_val = x_train[-10000:]
y_val = y_train[-10000:]
x_train = x_train[:-10000]
y_train = y_train[:-10000]

# Prepare the training dataset.
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = train_dataset.shuffle(buffer_size=1024).batch(batch_size)

# Prepare the validation dataset.
val_dataset = tf.data.Dataset.from_tensor_slices((x_val, y_val))
val_dataset = val_dataset.batch(batch_size)
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
11490434/11490434 [==============================] - 0s 0us/step

トレーニングループは以下のとおりです。

  • エポックをイテレートする for ループを開きます。
  • エポックごとに、バッチでデータセットをイテレートする for ループを開きます。
  • バッチごとに、GradientTape() スコープを開きます。
  • このスコープ内で、モデル(フォワードパス)を呼び出し、損失を計算します。
  • スコープ外で、損失に対するモデルの重みの勾配を取得します。
  • 最後にオプティマイザを使用して、勾配に基づいてモデルの重みを更新します。
epochs = 2
for epoch in range(epochs):
    print("\nStart of epoch %d" % (epoch,))

    # Iterate over the batches of the dataset.
    for step, (x_batch_train, y_batch_train) in enumerate(train_dataset):

        # Open a GradientTape to record the operations run
        # during the forward pass, which enables auto-differentiation.
        with tf.GradientTape() as tape:

            # Run the forward pass of the layer.
            # The operations that the layer applies
            # to its inputs are going to be recorded
            # on the GradientTape.
            logits = model(x_batch_train, training=True)  # Logits for this minibatch

            # Compute the loss value for this minibatch.
            loss_value = loss_fn(y_batch_train, logits)

        # Use the gradient tape to automatically retrieve
        # the gradients of the trainable variables with respect to the loss.
        grads = tape.gradient(loss_value, model.trainable_weights)

        # Run one step of gradient descent by updating
        # the value of the variables to minimize the loss.
        optimizer.apply_gradients(zip(grads, model.trainable_weights))

        # Log every 200 batches.
        if step % 200 == 0:
            print(
                "Training loss (for one batch) at step %d: %.4f"
                % (step, float(loss_value))
            )
            print("Seen so far: %s samples" % ((step + 1) * batch_size))
Start of epoch 0
Training loss (for one batch) at step 0: 88.2665
Seen so far: 64 samples
Training loss (for one batch) at step 200: 1.5371
Seen so far: 12864 samples
Training loss (for one batch) at step 400: 1.0569
Seen so far: 25664 samples
Training loss (for one batch) at step 600: 1.4564
Seen so far: 38464 samples

Start of epoch 1
Training loss (for one batch) at step 0: 0.7407
Seen so far: 64 samples
Training loss (for one batch) at step 200: 0.4643
Seen so far: 12864 samples
Training loss (for one batch) at step 400: 0.9472
Seen so far: 25664 samples
Training loss (for one batch) at step 600: 0.9047
Seen so far: 38464 samples

メトリックの低レベルの処理

この基本ループにメトリックの監視を追加しましょう。

新規作成されたトレーニングループでは、組み込みメトリック(またはユーザーが作成したカスタムメトリック)をすぐに再利用できます。フローは以下のとおりです。

  • ループ開始時にメトリックをインスタンス化します。
  • 各バッチの後に metric.update_state()を 呼び出します。
  • メトリックのその時点の値を表示する必要がある場合は、metric.result() を呼び出します。
  • メトリックの状態をクリアする必要がある場合(通常はエポックの終わり)に metric.reset_states() を呼び出します。

上記を使用して、各エポックの終わりの検証データで SparseCategoricalAccuracy を計算してみましょう。

# Get model
inputs = keras.Input(shape=(784,), name="digits")
x = layers.Dense(64, activation="relu", name="dense_1")(inputs)
x = layers.Dense(64, activation="relu", name="dense_2")(x)
outputs = layers.Dense(10, name="predictions")(x)
model = keras.Model(inputs=inputs, outputs=outputs)

# Instantiate an optimizer to train the model.
optimizer = keras.optimizers.SGD(learning_rate=1e-3)
# Instantiate a loss function.
loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)

# Prepare the metrics.
train_acc_metric = keras.metrics.SparseCategoricalAccuracy()
val_acc_metric = keras.metrics.SparseCategoricalAccuracy()

トレーニングと評価のループは以下のとおりです。

import time

epochs = 2
for epoch in range(epochs):
    print("\nStart of epoch %d" % (epoch,))
    start_time = time.time()

    # Iterate over the batches of the dataset.
    for step, (x_batch_train, y_batch_train) in enumerate(train_dataset):
        with tf.GradientTape() as tape:
            logits = model(x_batch_train, training=True)
            loss_value = loss_fn(y_batch_train, logits)
        grads = tape.gradient(loss_value, model.trainable_weights)
        optimizer.apply_gradients(zip(grads, model.trainable_weights))

        # Update training metric.
        train_acc_metric.update_state(y_batch_train, logits)

        # Log every 200 batches.
        if step % 200 == 0:
            print(
                "Training loss (for one batch) at step %d: %.4f"
                % (step, float(loss_value))
            )
            print("Seen so far: %d samples" % ((step + 1) * batch_size))

    # Display metrics at the end of each epoch.
    train_acc = train_acc_metric.result()
    print("Training acc over epoch: %.4f" % (float(train_acc),))

    # Reset training metrics at the end of each epoch
    train_acc_metric.reset_states()

    # Run a validation loop at the end of each epoch.
    for x_batch_val, y_batch_val in val_dataset:
        val_logits = model(x_batch_val, training=False)
        # Update val metrics
        val_acc_metric.update_state(y_batch_val, val_logits)
    val_acc = val_acc_metric.result()
    val_acc_metric.reset_states()
    print("Validation acc: %.4f" % (float(val_acc),))
    print("Time taken: %.2fs" % (time.time() - start_time))
Start of epoch 0
Training loss (for one batch) at step 0: 96.5239
Seen so far: 64 samples
Training loss (for one batch) at step 200: 1.1529
Seen so far: 12864 samples
Training loss (for one batch) at step 400: 0.9397
Seen so far: 25664 samples
Training loss (for one batch) at step 600: 0.4302
Seen so far: 38464 samples
Training acc over epoch: 0.7533
Validation acc: 0.8407
Time taken: 8.49s

Start of epoch 1
Training loss (for one batch) at step 0: 0.7190
Seen so far: 64 samples
Training loss (for one batch) at step 200: 0.6213
Seen so far: 12864 samples
Training loss (for one batch) at step 400: 0.5086
Seen so far: 25664 samples
Training loss (for one batch) at step 600: 0.4910
Seen so far: 38464 samples
Training acc over epoch: 0.8540
Validation acc: 0.8815
Time taken: 8.47s

tf.function でトレーニングステップをスピードアップ

TensorFlow 2 のデフォルトのランタイムは Eager Execution です。そのため、上記のトレーニングループは迅速に実行されます。

これはデバッグには最適ですが、グラフのコンパイルには明確なパフォーマンス上の利点があります。計算を静的グラフとして記述することにより、フレームワークはグローバルなパフォーマンスの最適化を適用できます。これは、フレームワークが次に何が起こるかを知らずに、次々とグリーディに演算を実行するように制約されている場合は不可能です。

テンソルを入力として受け取る関数は、静的グラフにコンパイルできます。 次のように、@tf.function デコレータを追加するだけです。

@tf.function
def train_step(x, y):
    with tf.GradientTape() as tape:
        logits = model(x, training=True)
        loss_value = loss_fn(y, logits)
    grads = tape.gradient(loss_value, model.trainable_weights)
    optimizer.apply_gradients(zip(grads, model.trainable_weights))
    train_acc_metric.update_state(y, logits)
    return loss_value

評価ステップでも同じように実行できます。

@tf.function
def test_step(x, y):
    val_logits = model(x, training=False)
    val_acc_metric.update_state(y, val_logits)

次に、このコンパイルされたトレーニングステップでトレーニングループを再度実行します。

import time

epochs = 2
for epoch in range(epochs):
    print("\nStart of epoch %d" % (epoch,))
    start_time = time.time()

    # Iterate over the batches of the dataset.
    for step, (x_batch_train, y_batch_train) in enumerate(train_dataset):
        loss_value = train_step(x_batch_train, y_batch_train)

        # Log every 200 batches.
        if step % 200 == 0:
            print(
                "Training loss (for one batch) at step %d: %.4f"
                % (step, float(loss_value))
            )
            print("Seen so far: %d samples" % ((step + 1) * batch_size))

    # Display metrics at the end of each epoch.
    train_acc = train_acc_metric.result()
    print("Training acc over epoch: %.4f" % (float(train_acc),))

    # Reset training metrics at the end of each epoch
    train_acc_metric.reset_states()

    # Run a validation loop at the end of each epoch.
    for x_batch_val, y_batch_val in val_dataset:
        test_step(x_batch_val, y_batch_val)

    val_acc = val_acc_metric.result()
    val_acc_metric.reset_states()
    print("Validation acc: %.4f" % (float(val_acc),))
    print("Time taken: %.2fs" % (time.time() - start_time))
Start of epoch 0
Training loss (for one batch) at step 0: 0.3868
Seen so far: 64 samples
Training loss (for one batch) at step 200: 0.6356
Seen so far: 12864 samples
Training loss (for one batch) at step 400: 0.3762
Seen so far: 25664 samples
Training loss (for one batch) at step 600: 0.5478
Seen so far: 38464 samples
Training acc over epoch: 0.8828
Validation acc: 0.8959
Time taken: 1.48s

Start of epoch 1
Training loss (for one batch) at step 0: 0.2488
Seen so far: 64 samples
Training loss (for one batch) at step 200: 0.2469
Seen so far: 12864 samples
Training loss (for one batch) at step 400: 0.5903
Seen so far: 25664 samples
Training loss (for one batch) at step 600: 0.4073
Seen so far: 38464 samples
Training acc over epoch: 0.8967
Validation acc: 0.8982
Time taken: 1.13s

スピードアップしました。

モデルにより追跡される損失の低レベルの処理

レイヤーとモデルは、self.add_loss(value)を呼び出すレイヤーによるフォワードパス中に発生した損失を再帰的に追跡します。結果のスカラー損失値のリストは、フォワードパスの最後にあるプロパティ model.losses に表示されます。

これらの損失を使用する場合は、合計して、トレーニングステップの主な損失に追加する必要があります。

次のレイヤーはアクティビティの正規化の損失を作成します。

class ActivityRegularizationLayer(layers.Layer):
    def call(self, inputs):
        self.add_loss(1e-2 * tf.reduce_sum(inputs))
        return inputs

これを使用する非常にシンプルなモデルを構築しましょう。

inputs = keras.Input(shape=(784,), name="digits")
x = layers.Dense(64, activation="relu")(inputs)
# Insert activity regularization as a layer
x = ActivityRegularizationLayer()(x)
x = layers.Dense(64, activation="relu")(x)
outputs = layers.Dense(10, name="predictions")(x)

model = keras.Model(inputs=inputs, outputs=outputs)

トレーニングステップは次のようになります。

@tf.function
def train_step(x, y):
    with tf.GradientTape() as tape:
        logits = model(x, training=True)
        loss_value = loss_fn(y, logits)
        # Add any extra losses created during the forward pass.
        loss_value += sum(model.losses)
    grads = tape.gradient(loss_value, model.trainable_weights)
    optimizer.apply_gradients(zip(grads, model.trainable_weights))
    train_acc_metric.update_state(y, logits)
    return loss_value

まとめ

ここでは、組み込みのトレーニングループを使用して、独自のループを新規作成する方法を見てきました。

最後に、このガイドで学習したすべてをまとめる簡単なエンドツーエンドの例を見てみましょう。MNIST の数字データでトレーニングされた DCGAN です。

エンドツーエンドの例:GAN トレーニングループの新規作成

敵対的生成ネットワーク(GAN)というのを聞いたことがあるかもしれません。GAN は、画像のトレーニングデータセット(画像の「潜在空間」)の潜在的な分布を学習することにより、ほぼ現実的な新しい画像を生成できます。

GAN は「ジェネレータ」モデルと「ディスクリミネータ」モデルの 2 つで構成されています。「ジェネレータ」モデルは潜在空間の点を画像空間の点にマッピングし、「ディスクリミネータ」モデルは、実物の画像(トレーニングデータセットから)と偽の画像(ジェネレータネットワークの出力)の違いを判別する分類器です。

GANトレーニングループは以下のとおりです。

  1. ディスクリミネータをトレーニングする。
  • 潜在空間のランダムポイントのバッチをサンプリングする。
  • 「ジェネレータ」モデルを使用して、ポイントを偽の画像に変換する。
  • 実際の画像のバッチを取得し、生成された画像と組み合わせる。
  • 「ディスクリミネータ」モデルをトレーニングして、生成された画像と実際の画像を分類する。
  1. ジェネレータをトレーニングする。
  • 潜在空間のランダムポイントをサンプリングする。
  • 「ジェネレータ」ネットワークを介して、ポイントを偽の画像に変換する。
  • 実際の画像のバッチを取得し、生成された画像と組み合わせる。
  • 「ジェネレータ」モデルをトレーニングして、ディスクリミネータを「だまし」、偽の画像を実物として分類する。

GAN の詳細については、「Deep Learning with Python」を参照してください。

このトレーニングループを実装しましょう。まず、偽の数字と実際の数字を分類するためのディスクリミネータを作成します。

discriminator = keras.Sequential(
    [
        keras.Input(shape=(28, 28, 1)),
        layers.Conv2D(64, (3, 3), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.Conv2D(128, (3, 3), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.GlobalMaxPooling2D(),
        layers.Dense(1),
    ],
    name="discriminator",
)
discriminator.summary()
Model: "discriminator"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
=================================================================
 conv2d (Conv2D)             (None, 14, 14, 64)        640       
                                                                 
 leaky_re_lu (LeakyReLU)     (None, 14, 14, 64)        0         
                                                                 
 conv2d_1 (Conv2D)           (None, 7, 7, 128)         73856     
                                                                 
 leaky_re_lu_1 (LeakyReLU)   (None, 7, 7, 128)         0         
                                                                 
 global_max_pooling2d (Globa  (None, 128)              0         
 lMaxPooling2D)                                                  
                                                                 
 dense_4 (Dense)             (None, 1)                 129       
                                                                 
=================================================================
Total params: 74,625
Trainable params: 74,625
Non-trainable params: 0
_________________________________________________________________

次に、潜在的なベクトルを形状(28, 28, 1)(MNISTの数字を表す)の出力に変換するジェネレータネットワークを作成します。

latent_dim = 128

generator = keras.Sequential(
    [
        keras.Input(shape=(latent_dim,)),
        # We want to generate 128 coefficients to reshape into a 7x7x128 map
        layers.Dense(7 * 7 * 128),
        layers.LeakyReLU(alpha=0.2),
        layers.Reshape((7, 7, 128)),
        layers.Conv2DTranspose(128, (4, 4), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.Conv2DTranspose(128, (4, 4), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.Conv2D(1, (7, 7), padding="same", activation="sigmoid"),
    ],
    name="generator",
)

ここで重要なのが、トレーニングループです。ご覧のとおり、非常に簡単です。トレーニングステップの関数は 17 行だけです。

# Instantiate one optimizer for the discriminator and another for the generator.
d_optimizer = keras.optimizers.Adam(learning_rate=0.0003)
g_optimizer = keras.optimizers.Adam(learning_rate=0.0004)

# Instantiate a loss function.
loss_fn = keras.losses.BinaryCrossentropy(from_logits=True)


@tf.function
def train_step(real_images):
    # Sample random points in the latent space
    random_latent_vectors = tf.random.normal(shape=(batch_size, latent_dim))
    # Decode them to fake images
    generated_images = generator(random_latent_vectors)
    # Combine them with real images
    combined_images = tf.concat([generated_images, real_images], axis=0)

    # Assemble labels discriminating real from fake images
    labels = tf.concat(
        [tf.ones((batch_size, 1)), tf.zeros((real_images.shape[0], 1))], axis=0
    )
    # Add random noise to the labels - important trick!
    labels += 0.05 * tf.random.uniform(labels.shape)

    # Train the discriminator
    with tf.GradientTape() as tape:
        predictions = discriminator(combined_images)
        d_loss = loss_fn(labels, predictions)
    grads = tape.gradient(d_loss, discriminator.trainable_weights)
    d_optimizer.apply_gradients(zip(grads, discriminator.trainable_weights))

    # Sample random points in the latent space
    random_latent_vectors = tf.random.normal(shape=(batch_size, latent_dim))
    # Assemble labels that say "all real images"
    misleading_labels = tf.zeros((batch_size, 1))

    # Train the generator (note that we should *not* update the weights
    # of the discriminator)!
    with tf.GradientTape() as tape:
        predictions = discriminator(generator(random_latent_vectors))
        g_loss = loss_fn(misleading_labels, predictions)
    grads = tape.gradient(g_loss, generator.trainable_weights)
    g_optimizer.apply_gradients(zip(grads, generator.trainable_weights))
    return d_loss, g_loss, generated_images

画像のバッチに対して繰り返し train_step を呼び出して、GAN をトレーニングします。

ジェネレータとディスクリミネータは畳み込みニューラルネットワークなので、このコードを GPU で実行することができます。

import os

# Prepare the dataset. We use both the training & test MNIST digits.
batch_size = 64
(x_train, _), (x_test, _) = keras.datasets.mnist.load_data()
all_digits = np.concatenate([x_train, x_test])
all_digits = all_digits.astype("float32") / 255.0
all_digits = np.reshape(all_digits, (-1, 28, 28, 1))
dataset = tf.data.Dataset.from_tensor_slices(all_digits)
dataset = dataset.shuffle(buffer_size=1024).batch(batch_size)

epochs = 1  # In practice you need at least 20 epochs to generate nice digits.
save_dir = "./"

for epoch in range(epochs):
    print("\nStart epoch", epoch)

    for step, real_images in enumerate(dataset):
        # Train the discriminator & generator on one batch of real images.
        d_loss, g_loss, generated_images = train_step(real_images)

        # Logging.
        if step % 200 == 0:
            # Print metrics
            print("discriminator loss at step %d: %.2f" % (step, d_loss))
            print("adversarial loss at step %d: %.2f" % (step, g_loss))

            # Save one generated image
            img = tf.keras.preprocessing.image.array_to_img(
                generated_images[0] * 255.0, scale=False
            )
            img.save(os.path.join(save_dir, "generated_img" + str(step) + ".png"))

        # To limit execution time we stop after 10 steps.
        # Remove the lines below to actually train the model!
        if step > 10:
            break
Start epoch 0
discriminator loss at step 0: 0.71
adversarial loss at step 0: 0.70

以上です。Colab GPU で約 30 秒のトレーニングを行うと、実物のような偽の MNIST の数字データが表示されます。