Model.fit の処理をカスタマイズする

TensorFlow.org で表示 Google Colab で実行 GitHub でソースを表示 ノートブックをダウンロード

はじめに

教師あり学習を実行するときに fit() を使用するとスムーズに学習を進めることができます。

独自のトレーニングループを新規で書く必要がある場合には、GradientTape を使用すると、細かく制御することができます。

しかし、カスタムトレーニングアルゴリズムが必要ながらも、コールバック、組み込みの分散サポート、ステップ結合など、fit() の便利な機能を利用したい場合には、どうすればよいのでしょうか?

Keras の基本原則は、複雑性のプログレッシブディスクロージャ―です。常に段階的に低レベルのワークフローに入ることが可能で、高レベルの機能性がユースケースと完全に一致しない場合でも、急激に性能が落ちるようなことはありません。相応の高レベルの利便性を維持しながら細部をさらに制御することができます。

fit() の動作をカスタマイズする必要がある場合は、Model クラスのトレーニングステップ関数をオーバーライドする必要があります。これはデータのバッチごとに fit() に呼び出される関数です。これによって、通常通り fit() を呼び出せるようになり、独自の学習アルゴリズムが実行されるようになります。

このパターンは Functional API を使用したモデル構築を妨げるものではないことに注意してください。これは、Sequential モデル、Functional API モデル、サブクラス化されたモデルのいずれを構築する場合にも適用可能です。

では、その仕組みを見ていきましょう。

セットアップ

TensorFlow 2.2 以降が必要です。

import tensorflow as tf
from tensorflow import keras

最初の簡単な例

簡単な例から始めてみましょう。

  • keras.Model をサブクラス化する新しいクラスを作成します。
  • train_step(self, data) メソッドだけをオーバーライドします。
  • メトリクス名(損失を含む)をマッピングするディクショナリを現在の値に返します。

入力引数の data は、トレーニングデータとして適合するために渡される値です。

  • fit(x, y, ...) を呼び出して Numpy 配列を渡す場合は、data はタプル型 (x, y) になります。
  • fit(dataset, ...) を呼び出して tf.data.Dataset を渡す場合は、data は各バッチで dataset により生成される値になります。

train_step メソッドの本体には、既に使い慣れているものと同様の定期的なトレーニングアップデートを実装しています。重要なのは、損失の計算を self.compiled_loss を介して行っていることで、それによって compile() に渡された損失関数がラップされています。

同様に、self.compiled_metrics.update_state(y, y_pred) を呼び出して compile() に渡されたメトリクスの状態を更新し、最後に self.metrics の結果をクエリして現在の値を取得しています。

class CustomModel(keras.Model):
    def train_step(self, data):
        # Unpack the data. Its structure depends on your model and
        # on what you pass to `fit()`.
        x, y = data

        with tf.GradientTape() as tape:
            y_pred = self(x, training=True)  # Forward pass
            # Compute the loss value
            # (the loss function is configured in `compile()`)
            loss = self.compiled_loss(y, y_pred, regularization_losses=self.losses)

        # Compute gradients
        trainable_vars = self.trainable_variables
        gradients = tape.gradient(loss, trainable_vars)
        # Update weights
        self.optimizer.apply_gradients(zip(gradients, trainable_vars))
        # Update metrics (includes the metric that tracks the loss)
        self.compiled_metrics.update_state(y, y_pred)
        # Return a dict mapping metric names to current value
        return {m.name: m.result() for m in self.metrics}

これを試してみましょう。

import numpy as np

# Construct and compile an instance of CustomModel
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)
model.compile(optimizer="adam", loss="mse", metrics=["mae"])

# Just use `fit` as usual
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
model.fit(x, y, epochs=3)
2021-08-14 01:23:00.531213: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-08-14 01:23:00.540234: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-08-14 01:23:00.541307: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-08-14 01:23:00.543220: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-08-14 01:23:00.543884: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-08-14 01:23:00.544864: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-08-14 01:23:00.545806: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-08-14 01:23:01.187001: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-08-14 01:23:01.188040: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-08-14 01:23:01.188904: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-08-14 01:23:01.189771: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1510] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 14648 MB memory:  -> device: 0, name: Tesla V100-SXM2-16GB, pci bus id: 0000:00:05.0, compute capability: 7.0
2021-08-14 01:23:01.718738: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:185] None of the MLIR Optimization Passes are enabled (registered 2)
Epoch 1/3
32/32 [==============================] - 1s 2ms/step - loss: 0.3302 - mae: 0.4605
Epoch 2/3
32/32 [==============================] - 0s 2ms/step - loss: 0.2194 - mae: 0.3751
Epoch 3/3
32/32 [==============================] - 0s 2ms/step - loss: 0.2144 - mae: 0.3716
<keras.callbacks.History at 0x7f54347e4ad0>

低レベルにする

当然ながら、compile() に損失関数を渡すことを省略し、代わりに train_step ですべてを手動で実行することは可能です。これはメトリクスの場合でも同様です。

オプティマイザの構成に compile() のみを使用した、低レベルの例を次に示します。

  • まず、損失と MAE スコアを追跡する Metric インスタンスを作成します。
  • これらのメトリクスの状態を更新するカスタム train_step() を実装し(メトリクスで update_state() を呼び出します)、現在の平均値を返して進捗バーで表示し、任意のコールバックに渡せるようにメトリクスをクエリします(result() を使用)。
  • エポックごとにメトリクスに reset_states() を呼び出す必要があるところに注意してください。呼び出さない場合、result() は通常処理しているエポックごとの平均ではなく、トレーニングを開始してからの平均を返してしまいます。幸いにも、これはフレームワークが行ってくれるため、モデルの metrics プロパティにリセットするメトリクスをリストするだけで実現できます。モデルは、そこにリストされているオブジェクトに対する reset_states() の呼び出しを各 fit() エポックの開始時または evaluate() への呼び出しの開始時に行うようになります。
loss_tracker = keras.metrics.Mean(name="loss")
mae_metric = keras.metrics.MeanAbsoluteError(name="mae")


class CustomModel(keras.Model):
    def train_step(self, data):
        x, y = data

        with tf.GradientTape() as tape:
            y_pred = self(x, training=True)  # Forward pass
            # Compute our own loss
            loss = keras.losses.mean_squared_error(y, y_pred)

        # Compute gradients
        trainable_vars = self.trainable_variables
        gradients = tape.gradient(loss, trainable_vars)

        # Update weights
        self.optimizer.apply_gradients(zip(gradients, trainable_vars))

        # Compute our own metrics
        loss_tracker.update_state(loss)
        mae_metric.update_state(y, y_pred)
        return {"loss": loss_tracker.result(), "mae": mae_metric.result()}

    @property
    def metrics(self):
        # We list our `Metric` objects here so that `reset_states()` can be
        # called automatically at the start of each epoch
        # or at the start of `evaluate()`.
        # If you don't implement this property, you have to call
        # `reset_states()` yourself at the time of your choosing.
        return [loss_tracker, mae_metric]


# Construct an instance of CustomModel
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)

# We don't passs a loss or metrics here.
model.compile(optimizer="adam")

# Just use `fit` as usual -- you can use callbacks, etc.
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
model.fit(x, y, epochs=5)
Epoch 1/5
32/32 [==============================] - 0s 2ms/step - loss: 2.0085 - mae: 1.3238
Epoch 2/5
32/32 [==============================] - 0s 2ms/step - loss: 0.9698 - mae: 0.8642
Epoch 3/5
32/32 [==============================] - 0s 2ms/step - loss: 0.4847 - mae: 0.5772
Epoch 4/5
32/32 [==============================] - 0s 2ms/step - loss: 0.3102 - mae: 0.4482
Epoch 5/5
32/32 [==============================] - 0s 2ms/step - loss: 0.2630 - mae: 0.4093
<keras.callbacks.History at 0x7f54345e0a90>

sample_weightclass_weight をサポートする

最初の基本的な例では、サンプルの重み付けについては何も言及していないことに気付いているかもしれません。fit() の引数 sample_weightclass_weight をサポートする場合には、次のようにします。

  • data 引数から sample_weight をアンパックします。
  • それを compiled_losscompiled_metrics に渡します(もちろん、 損失とメトリクスが compile() に依存しない場合は手動での適用が可能です)。
  • それがリストです。
class CustomModel(keras.Model):
    def train_step(self, data):
        # Unpack the data. Its structure depends on your model and
        # on what you pass to `fit()`.
        if len(data) == 3:
            x, y, sample_weight = data
        else:
            sample_weight = None
            x, y = data

        with tf.GradientTape() as tape:
            y_pred = self(x, training=True)  # Forward pass
            # Compute the loss value.
            # The loss function is configured in `compile()`.
            loss = self.compiled_loss(
                y,
                y_pred,
                sample_weight=sample_weight,
                regularization_losses=self.losses,
            )

        # Compute gradients
        trainable_vars = self.trainable_variables
        gradients = tape.gradient(loss, trainable_vars)

        # Update weights
        self.optimizer.apply_gradients(zip(gradients, trainable_vars))

        # Update the metrics.
        # Metrics are configured in `compile()`.
        self.compiled_metrics.update_state(y, y_pred, sample_weight=sample_weight)

        # Return a dict mapping metric names to current value.
        # Note that it will include the loss (tracked in self.metrics).
        return {m.name: m.result() for m in self.metrics}


# Construct and compile an instance of CustomModel
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)
model.compile(optimizer="adam", loss="mse", metrics=["mae"])

# You can now use sample_weight argument
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
sw = np.random.random((1000, 1))
model.fit(x, y, sample_weight=sw, epochs=3)
Epoch 1/3
32/32 [==============================] - 0s 2ms/step - loss: 0.1631 - mae: 0.4582
Epoch 2/3
32/32 [==============================] - 0s 2ms/step - loss: 0.1194 - mae: 0.3962
Epoch 3/3
32/32 [==============================] - 0s 2ms/step - loss: 0.1167 - mae: 0.3923
<keras.callbacks.History at 0x7f5434518b10>

独自の評価ステップを提供する

model.evaluate() への呼び出しに同じことをする場合はどうしたらよいでしょう?その場合は、まったく同じ方法で test_step をオーバーライドします。これは次のようになります。

class CustomModel(keras.Model):
    def test_step(self, data):
        # Unpack the data
        x, y = data
        # Compute predictions
        y_pred = self(x, training=False)
        # Updates the metrics tracking the loss
        self.compiled_loss(y, y_pred, regularization_losses=self.losses)
        # Update the metrics.
        self.compiled_metrics.update_state(y, y_pred)
        # Return a dict mapping metric names to current value.
        # Note that it will include the loss (tracked in self.metrics).
        return {m.name: m.result() for m in self.metrics}


# Construct an instance of CustomModel
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)
model.compile(loss="mse", metrics=["mae"])

# Evaluate with our custom test_step
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
model.evaluate(x, y)
32/32 [==============================] - 0s 1ms/step - loss: 0.3380 - mae: 0.4728
[0.33799418807029724, 0.47279784083366394]

まとめ: エンドツーエンド GAN の例

ここで学んだことをすべて採り入れたエンドツーエンドの例を見てみましょう。

以下を検討してみましょう。

  • 28x28x1 の画像を生成するジェネレーターネットワーク。
  • 28x28x1 の画像を 2 つのクラス(「偽物」と「本物」)に分類するディスクリミネーターネットワーク。
  • それぞれに 1 つのオプティマイザ。
  • ディスクリミネーターをトレーニングする損失関数。
from tensorflow.keras import layers

# Create the discriminator
discriminator = keras.Sequential(
    [
        keras.Input(shape=(28, 28, 1)),
        layers.Conv2D(64, (3, 3), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.Conv2D(128, (3, 3), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.GlobalMaxPooling2D(),
        layers.Dense(1),
    ],
    name="discriminator",
)

# Create the generator
latent_dim = 128
generator = keras.Sequential(
    [
        keras.Input(shape=(latent_dim,)),
        # We want to generate 128 coefficients to reshape into a 7x7x128 map
        layers.Dense(7 * 7 * 128),
        layers.LeakyReLU(alpha=0.2),
        layers.Reshape((7, 7, 128)),
        layers.Conv2DTranspose(128, (4, 4), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.Conv2DTranspose(128, (4, 4), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.Conv2D(1, (7, 7), padding="same", activation="sigmoid"),
    ],
    name="generator",
)

ここにフィーチャーコンプリートの GAN クラスがあります。compile()をオーバーライドして独自のシグネチャを使用することにより、GAN アルゴリズム全体をtrain_stepの 17 行で実装しています。

class GAN(keras.Model):
    def __init__(self, discriminator, generator, latent_dim):
        super(GAN, self).__init__()
        self.discriminator = discriminator
        self.generator = generator
        self.latent_dim = latent_dim

    def compile(self, d_optimizer, g_optimizer, loss_fn):
        super(GAN, self).compile()
        self.d_optimizer = d_optimizer
        self.g_optimizer = g_optimizer
        self.loss_fn = loss_fn

    def train_step(self, real_images):
        if isinstance(real_images, tuple):
            real_images = real_images[0]
        # Sample random points in the latent space
        batch_size = tf.shape(real_images)[0]
        random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))

        # Decode them to fake images
        generated_images = self.generator(random_latent_vectors)

        # Combine them with real images
        combined_images = tf.concat([generated_images, real_images], axis=0)

        # Assemble labels discriminating real from fake images
        labels = tf.concat(
            [tf.ones((batch_size, 1)), tf.zeros((batch_size, 1))], axis=0
        )
        # Add random noise to the labels - important trick!
        labels += 0.05 * tf.random.uniform(tf.shape(labels))

        # Train the discriminator
        with tf.GradientTape() as tape:
            predictions = self.discriminator(combined_images)
            d_loss = self.loss_fn(labels, predictions)
        grads = tape.gradient(d_loss, self.discriminator.trainable_weights)
        self.d_optimizer.apply_gradients(
            zip(grads, self.discriminator.trainable_weights)
        )

        # Sample random points in the latent space
        random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))

        # Assemble labels that say "all real images"
        misleading_labels = tf.zeros((batch_size, 1))

        # Train the generator (note that we should *not* update the weights
        # of the discriminator)!
        with tf.GradientTape() as tape:
            predictions = self.discriminator(self.generator(random_latent_vectors))
            g_loss = self.loss_fn(misleading_labels, predictions)
        grads = tape.gradient(g_loss, self.generator.trainable_weights)
        self.g_optimizer.apply_gradients(zip(grads, self.generator.trainable_weights))
        return {"d_loss": d_loss, "g_loss": g_loss}

試運転してみましょう。

# Prepare the dataset. We use both the training & test MNIST digits.
batch_size = 64
(x_train, _), (x_test, _) = keras.datasets.mnist.load_data()
all_digits = np.concatenate([x_train, x_test])
all_digits = all_digits.astype("float32") / 255.0
all_digits = np.reshape(all_digits, (-1, 28, 28, 1))
dataset = tf.data.Dataset.from_tensor_slices(all_digits)
dataset = dataset.shuffle(buffer_size=1024).batch(batch_size)

gan = GAN(discriminator=discriminator, generator=generator, latent_dim=latent_dim)
gan.compile(
    d_optimizer=keras.optimizers.Adam(learning_rate=0.0003),
    g_optimizer=keras.optimizers.Adam(learning_rate=0.0003),
    loss_fn=keras.losses.BinaryCrossentropy(from_logits=True),
)

# To limit the execution time, we only train on 100 batches. You can train on
# the entire dataset. You will need about 20 epochs to get nice results.
gan.fit(dataset.take(100), epochs=1)
2021-08-14 01:23:06.938291: I tensorflow/stream_executor/cuda/cuda_dnn.cc:369] Loaded cuDNN version 8100
2021-08-14 01:23:08.321107: I tensorflow/core/platform/default/subprocess.cc:304] Start cannot spawn child process: No such file or directory
100/100 [==============================] - 4s 12ms/step - d_loss: 0.4089 - g_loss: 0.8657
<keras.callbacks.History at 0x7f5434380710>

ディープラーニングの背後にある考え方は単純なわけですから、当然、実装も単純なのです。