Написание пользовательских слоев и моделей с Keras

Смотрите на TensorFlow.org Запустите в Google Colab Изучайте код на GitHub Скачайте ноутбук

Установка

from __future__ import absolute_import, division, print_function, unicode_literals

try:
  # %tensorflow_version only exists in Colab.
  %tensorflow_version 2.x
except Exception:
  pass
import tensorflow as tf

tf.keras.backend.clear_session()  # Для легкого сброса состояния ноутбука.

Класс Layer

Слои инкапсулируют состояние (веса) и некоторые вычисления

Основная структура даннных с которой вы будете работать это Layer. Слой инакпсулирует и состояние ("веса" слоя) и преобразование из входных данных в выходные ("вызов", прямое распространение на слое).

Вот полносвязный слой. У него есть состояние: переменные w и b.

from tensorflow.keras import layers


class Linear(layers.Layer):

  def __init__(self, units=32, input_dim=32):
    super(Linear, self).__init__()
    w_init = tf.random_normal_initializer()
    self.w = tf.Variable(initial_value=w_init(shape=(input_dim, units),
                                              dtype='float32'),
                         trainable=True)
    b_init = tf.zeros_initializer()
    self.b = tf.Variable(initial_value=b_init(shape=(units,),
                                              dtype='float32'),
                         trainable=True)

  def call(self, inputs):
    return tf.matmul(inputs, self.w) + self.b

x = tf.ones((2, 2))
linear_layer = Linear(4, 2)
y = linear_layer(x)
print(y)
tf.Tensor(
[[-0.0175286  -0.02815745  0.0614832   0.06159694]
 [-0.0175286  -0.02815745  0.0614832   0.06159694]], shape=(2, 4), dtype=float32)

Обратите внимание, что весаw и b автоматически отслеживаются слоем после их установки в качестве атрибутов слоя:

assert linear_layer.weights == [linear_layer.w, linear_layer.b]

Обратите внимание, что у вас также есть более быстрый способ добавления веса к слою: методadd_weight:

class Linear(layers.Layer):

  def __init__(self, units=32, input_dim=32):
    super(Linear, self).__init__()
    self.w = self.add_weight(shape=(input_dim, units),
                             initializer='random_normal',
                             trainable=True)
    self.b = self.add_weight(shape=(units,),
                             initializer='zeros',
                             trainable=True)

  def call(self, inputs):
    return tf.matmul(inputs, self.w) + self.b

x = tf.ones((2, 2))
linear_layer = Linear(4, 2)
y = linear_layer(x)
print(y)
tf.Tensor(
[[ 0.12667263  0.08049832 -0.11591484  0.00338535]
 [ 0.12667263  0.08049832 -0.11591484  0.00338535]], shape=(2, 4), dtype=float32)

У слоев могут быть необучаемые веса

Кроме обучаемых весов, вы также можете добавить к слою не обучаемые веса. Такие веса не должны учитываться при обратном распространении ошибки, , когда вы тренируете слой.

Вот так можно добавить и использовать не обучаемый вес:

class ComputeSum(layers.Layer):

  def __init__(self, input_dim):
    super(ComputeSum, self).__init__()
    self.total = tf.Variable(initial_value=tf.zeros((input_dim,)),
                             trainable=False)

  def call(self, inputs):
    self.total.assign_add(tf.reduce_sum(inputs, axis=0))
    return self.total

x = tf.ones((2, 2))
my_sum = ComputeSum(2)
y = my_sum(x)
print(y.numpy())
y = my_sum(x)
print(y.numpy())
[2. 2.]
[4. 4.]

Это часть layer.weights, но он классифицируется как необучаемый вес:

print('weights:', len(my_sum.weights))
print('non-trainable weights:', len(my_sum.non_trainable_weights))

# Это не включено в обучаемы веса:
print('trainable_weights:', my_sum.trainable_weights)
weights: 1
non-trainable weights: 1
trainable_weights: []

Хорошая практика: откладывать создание весов до тех пор, пока не станет известна форма входных данных

В приведенном выше примере с логистической регрессией, наш слой Linear layer принимает аргумент input_dim который был использован чтобы посчитать размерности весов w и b в __init__:

class Linear(layers.Layer):

  def __init__(self, units=32, input_dim=32):
      super(Linear, self).__init__()
      self.w = self.add_weight(shape=(input_dim, units),
                               initializer='random_normal',
                               trainable=True)
      self.b = self.add_weight(shape=(units,),
                               initializer='zeros',
                               trainable=True)

Часто вы можете не знать заранее размер ваших входных данных, и вы хотели бы лениво создавать веса, когда это значение станет известным, через какое-то время после создания экземпляра слоя.

В Keras API мы рекомендуем создавать веса слоев в методе build(input_shape) вашего слоя. Как в этом примере:

class Linear(layers.Layer):

  def __init__(self, units=32):
    super(Linear, self).__init__()
    self.units = units

  def build(self, input_shape):
    self.w = self.add_weight(shape=(input_shape[-1], self.units),
                             initializer='random_normal',
                             trainable=True)
    self.b = self.add_weight(shape=(self.units,),
                             initializer='random_normal',
                             trainable=True)

  def call(self, inputs):
    return tf.matmul(inputs, self.w) + self.b

Метод __call__ вашего слоя автоматически запускает build при первом вызове. Теперь у вас есть слой, который ленив и прост в использовании:

linear_layer = Linear(32)  # При создании экземпляра мы еще не знаем, на каких входах он будет вызываться
y = linear_layer(x)  # Веса слоя создается динамически при первом вызове

Слои рекурсивно компонуемы

Если вы присвоите экземпляр слоя как атрибут другого слоя, внешний слой начнет отслеживать веса внутреннего.

Мы рекомендуем создавать такие подслои в методе __init__ (поскольку подслои обычно имеют метод build, они будут собраны, когда будет собран внешний слой).

# Предположим что мы переиспользуем класс Linear
# с методом `build` который мы определили выше.

class MLPBlock(layers.Layer):

  def __init__(self):
    super(MLPBlock, self).__init__()
    self.linear_1 = Linear(32)
    self.linear_2 = Linear(32)
    self.linear_3 = Linear(1)

  def call(self, inputs):
    x = self.linear_1(inputs)
    x = tf.nn.relu(x)
    x = self.linear_2(x)
    x = tf.nn.relu(x)
    return self.linear_3(x)


mlp = MLPBlock()
y = mlp(tf.ones(shape=(3, 64)))  # Первый вызов `mlp` создаст веса
print('weights:', len(mlp.weights))
print('trainable weights:', len(mlp.trainable_weights))
weights: 6
trainable weights: 6

Слои рекурсивно собирают потери, созданные во время прямого прохода

При написании метода слоя call вы можете создать тензоры потерь, которые вы можете использовать позже, при написании цикла обучения. Это можно сделать путем вызова self.add_loss (value):

# Слой, создающий activity regularization loss
class ActivityRegularizationLayer(layers.Layer):

  def __init__(self, rate=1e-2):
    super(ActivityRegularizationLayer, self).__init__()
    self.rate = rate

  def call(self, inputs):
    self.add_loss(self.rate * tf.reduce_sum(inputs))
    return inputs

Значения потерь (в том числе созданные любым внутренним слоем) могут быть извлечены с помощью layer.losses. Это свойство сбрасывается в начале каждого__call__ на слой верхнего уровня, так что layer.losses всегда содержит значения потерь, созданные во время последнего прохода вперед.

class OuterLayer(layers.Layer):

  def __init__(self):
    super(OuterLayer, self).__init__()
    self.activity_reg = ActivityRegularizationLayer(1e-2)

  def call(self, inputs):
    return self.activity_reg(inputs)


layer = OuterLayer()
assert len(layer.losses) == 0  # Потерь пока нет, так как слой не вызываался
_ = layer(tf.zeros(1, 1))
assert len(layer.losses) == 1  # Мы создали оно значение потерь

# `layer.losses` сбрасывается в начале каждого __call__
_ = layer(tf.zeros(1, 1))
assert len(layer.losses) == 1  # Это потери созданные во время вызова выше

Кроме того, свойствоloss также содержит потери регуляризации, созданные для весов любого внутреннего слоя:

class OuterLayer(layers.Layer):

  def __init__(self):
    super(OuterLayer, self).__init__()
    self.dense = layers.Dense(32, kernel_regularizer=tf.keras.regularizers.l2(1e-3))

  def call(self, inputs):
    return self.dense(inputs)


layer = OuterLayer()
_ = layer(tf.zeros((1, 1)))

# Это `1e-3 * sum(layer.dense.kernel ** 2)`,
# созданное `kernel_regularizer` выше.
print(layer.losses)
[<tf.Tensor: shape=(), dtype=float32, numpy=0.0015689448>]

Предполагается, что эти потери будут учитываться при написании циклов обучения, например:

# Создаем экземпляр оптимизатора.
optimizer = tf.keras.optimizers.SGD(learning_rate=1e-3)
loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)

# Итерация по батчам набора данных.
for x_batch_train, y_batch_train in train_dataset:
  with tf.GradientTape() as tape:
    logits = layer(x_batch_train)  # Логиты для этого минибатча
    # Значения потерь для этого минибатча
    loss_value = loss_fn(y_batch_train, logits)
    # Добавим дополнительные потери созданные во время этого прохода вперед:
    loss_value += sum(model.losses)

  grads = tape.gradient(loss_value, model.trainable_weights)
  optimizer.apply_gradients(zip(grads, model.trainable_weights))

Подробное руководство по написанию циклов обучения см. во втором разделе руководства по обучению и оценке.

Вы можете опционально включить сериализацию на своих слоях

Если вам нужно, чтобы ваш кастомный слой был сериализуем как часть Functional model, вы можете опционально реализовать метод get_config:

class Linear(layers.Layer):

  def __init__(self, units=32):
    super(Linear, self).__init__()
    self.units = units

  def build(self, input_shape):
    self.w = self.add_weight(shape=(input_shape[-1], self.units),
                             initializer='random_normal',
                             trainable=True)
    self.b = self.add_weight(shape=(self.units,),
                             initializer='random_normal',
                             trainable=True)

  def call(self, inputs):
    return tf.matmul(inputs, self.w) + self.b

  def get_config(self):
    return {'units': self.units}


# Сейчас вы можете пересоздать слой из его конфигурации:
layer = Linear(64)
config = layer.get_config()
print(config)
new_layer = Linear.from_config(config)
{'units': 64}

Обратите внимание, что метод__init__ базового класса Layer принимает некоторые аргументы, в частностиname и dtype. Рекомендуется передать эти аргументы родительскому классу в__init__ и добавить их в конфиг слоя:

class Linear(layers.Layer):

  def __init__(self, units=32, **kwargs):
    super(Linear, self).__init__(**kwargs)
    self.units = units

  def build(self, input_shape):
    self.w = self.add_weight(shape=(input_shape[-1], self.units),
                             initializer='random_normal',
                             trainable=True)
    self.b = self.add_weight(shape=(self.units,),
                             initializer='random_normal',
                             trainable=True)

  def call(self, inputs):
    return tf.matmul(inputs, self.w) + self.b

  def get_config(self):
    config = super(Linear, self).get_config()
    config.update({'units': self.units})
    return config


layer = Linear(64)
config = layer.get_config()
print(config)
new_layer = Linear.from_config(config)
{'name': 'linear_8', 'trainable': True, 'dtype': 'float32', 'units': 64}

Если вам требуется больше гибкости при десериализации слоя из его конфига, вы также можете переопределить метод классаfrom_config. Это базовая реализация from_config:

def from_config(cls, config):
  return cls(**config)

Чтобы узнать больше о сериализации и сохранении см. полное Руководство по сохранению и сериализации моделей.

Привилегированный аргумент training в методе call

Некоторые уровни, в частности слои BatchNormalization и Dropout, имеют различное поведение во время обучения и вывода. Для таких слоев стандартной практикой является представление (булева) аргумента training в методе call.

Представляя этот аргумент в call, вы позволяете встроенным циклам обучения и оценки (например, fit) правильно использовать слой в обучении и выводе.

class CustomDropout(layers.Layer):

  def __init__(self, rate, **kwargs):
    super(CustomDropout, self).__init__(**kwargs)
    self.rate = rate

  def call(self, inputs, training=None):
    if training:
        return tf.nn.dropout(inputs, rate=self.rate)
    return inputs

Построение моделей

Класс Model

В общем, вы будете использовать классLayer для определения внутренних вычислительных блоков, а класс Model для определения внешней модели - объекта, который вы будете обучать.

Например, в модели ResNet50, у вас будет несколько блоков ResNet сабклассирующих Layer, и единственный Model охватывающий всю сеть ResNet50.

У класса Model тот же API что и у Layer, со следующими раазличиями:

  • Он предоставляет встроенные циклы обучения, оценки и прогнозирования (model.fit(), model.evaluate(), model.predict()).
  • Он предоставляет список своих внутренних слоев с помощью свойства model.layers.
  • Он предоставляет API сохранения и сериализации.

По сути, класс "Layer" соответствует тому, что мы называем в литературе "слой" (как в "сверточный слой" или "реккурентный слой") или "блок" (как в "блок ResNet" или "Inception block").

Между тем, класс "Model" соответствует тому, что мы называем в литературе "модель" (как в "модель глубокого обучения") или "сеть" (как в "нейронная сеть").

Например, мы можем взять наш вышеприведенный мини-resnet пример, и использовать его для построения Model который мы можем обучить с помощью fit(), которую мы можем сохранить с save_weights:

class ResNet(tf.keras.Model):

    def __init__(self):
        super(ResNet, self).__init__()
        self.block_1 = ResNetBlock()
        self.block_2 = ResNetBlock()
        self.global_pool = layers.GlobalAveragePooling2D()
        self.classifier = Dense(num_classes)

    def call(self, inputs):
        x = self.block_1(inputs)
        x = self.block_2(x)
        x = self.global_pool(x)
        return self.classifier(x)


resnet = ResNet()
dataset = ...
resnet.fit(dataset, epochs=10)
resnet.save_weights(filepath)

Собираем все вместе: полный пример

Вот то что мы выучили к этому моменту:

  • A Layer инкапсулирует состояние (созданное в __init__ или build) и некоторые вычисления (в call).
  • Слои могут быть рекурсивно вложены чтобы создать новый, больший вычислительный блок.
  • Слои могут создавать и отслеживать потери (обычно потери регуляризации).
  • Внешний контейнер, объект который вы будете обучать это Model. Model похож на Layer, с добавлением утилит обучения и сериализации.

Давайте соединим все эти вещи в одном сквозном примере: мы собираемся реализовать вариационный автоэкодер (VAE). Мы будем обучать его на цифрах MNIST.

Наш VAE будет подклассом Model, построенным вложенной композицией слоев являющихся подклассами Layer. У него будут потери регуляризации (KL-дивергенция).

class Sampling(layers.Layer):
  """Использует (z_mean, z_log_var) для выборки z, вектора кодирующего цифру."""

  def call(self, inputs):
    z_mean, z_log_var = inputs
    batch = tf.shape(z_mean)[0]
    dim = tf.shape(z_mean)[1]
    epsilon = tf.keras.backend.random_normal(shape=(batch, dim))
    return z_mean + tf.exp(0.5 * z_log_var) * epsilon


class Encoder(layers.Layer):
  """Отображает цифры MNIST в тройки (z_mean, z_log_var, z)."""

  def __init__(self,
               latent_dim=32,
               intermediate_dim=64,
               name='encoder',
               **kwargs):
    super(Encoder, self).__init__(name=name, **kwargs)
    self.dense_proj = layers.Dense(intermediate_dim, activation='relu')
    self.dense_mean = layers.Dense(latent_dim)
    self.dense_log_var = layers.Dense(latent_dim)
    self.sampling = Sampling()

  def call(self, inputs):
    x = self.dense_proj(inputs)
    z_mean = self.dense_mean(x)
    z_log_var = self.dense_log_var(x)
    z = self.sampling((z_mean, z_log_var))
    return z_mean, z_log_var, z


class Decoder(layers.Layer):
  """Конвертирует z, закодированный вектор цифры обратно в читаемую цифру."""

  def __init__(self,
               original_dim,
               intermediate_dim=64,
               name='decoder',
               **kwargs):
    super(Decoder, self).__init__(name=name, **kwargs)
    self.dense_proj = layers.Dense(intermediate_dim, activation='relu')
    self.dense_output = layers.Dense(original_dim, activation='sigmoid')

  def call(self, inputs):
    x = self.dense_proj(inputs)
    return self.dense_output(x)


class VariationalAutoEncoder(tf.keras.Model):
  """Соединяет энкодер и декодер в сквозную модель для обучения."""

  def __init__(self,
               original_dim,
               intermediate_dim=64,
               latent_dim=32,
               name='autoencoder',
               **kwargs):
    super(VariationalAutoEncoder, self).__init__(name=name, **kwargs)
    self.original_dim = original_dim
    self.encoder = Encoder(latent_dim=latent_dim,
                           intermediate_dim=intermediate_dim)
    self.decoder = Decoder(original_dim, intermediate_dim=intermediate_dim)

  def call(self, inputs):
    z_mean, z_log_var, z = self.encoder(inputs)
    reconstructed = self.decoder(z)
    # Добавляет потери регуляризации - KL-дивергенцию.
    kl_loss = - 0.5 * tf.reduce_mean(
        z_log_var - tf.square(z_mean) - tf.exp(z_log_var) + 1)
    self.add_loss(kl_loss)
    return reconstructed
original_dim = 784
vae = VariationalAutoEncoder(original_dim, 64, 32)

optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)
mse_loss_fn = tf.keras.losses.MeanSquaredError()

loss_metric = tf.keras.metrics.Mean()

(x_train, _), _ = tf.keras.datasets.mnist.load_data()
x_train = x_train.reshape(60000, 784).astype('float32') / 255

train_dataset = tf.data.Dataset.from_tensor_slices(x_train)
train_dataset = train_dataset.shuffle(buffer_size=1024).batch(64)

epochs = 3

# Итерируем по эпохам.
for epoch in range(epochs):
  print('Start of epoch %d' % (epoch,))

  # Итерируем по пакетам в датасете.
  for step, x_batch_train in enumerate(train_dataset):
    with tf.GradientTape() as tape:
      reconstructed = vae(x_batch_train)
      # Compute reconstruction loss
      loss = mse_loss_fn(x_batch_train, reconstructed)
      loss += sum(vae.losses)  # Add KLD regularization loss

    grads = tape.gradient(loss, vae.trainable_weights)
    optimizer.apply_gradients(zip(grads, vae.trainable_weights))

    loss_metric(loss)

    if step % 100 == 0:
      print('step %s: mean loss = %s' % (step, loss_metric.result()))

Start of epoch 0
step 0: mean loss = tf.Tensor(0.31630546, shape=(), dtype=float32)
step 100: mean loss = tf.Tensor(0.12512562, shape=(), dtype=float32)
step 200: mean loss = tf.Tensor(0.09897864, shape=(), dtype=float32)
step 300: mean loss = tf.Tensor(0.08902775, shape=(), dtype=float32)
step 400: mean loss = tf.Tensor(0.08408616, shape=(), dtype=float32)
step 500: mean loss = tf.Tensor(0.080782406, shape=(), dtype=float32)
step 600: mean loss = tf.Tensor(0.078656696, shape=(), dtype=float32)
step 700: mean loss = tf.Tensor(0.07707058, shape=(), dtype=float32)
step 800: mean loss = tf.Tensor(0.07590856, shape=(), dtype=float32)
step 900: mean loss = tf.Tensor(0.074899964, shape=(), dtype=float32)
Start of epoch 1
step 0: mean loss = tf.Tensor(0.07461445, shape=(), dtype=float32)
step 100: mean loss = tf.Tensor(0.07397132, shape=(), dtype=float32)
step 200: mean loss = tf.Tensor(0.0734705, shape=(), dtype=float32)
step 300: mean loss = tf.Tensor(0.07300055, shape=(), dtype=float32)
step 400: mean loss = tf.Tensor(0.07267489, shape=(), dtype=float32)
step 500: mean loss = tf.Tensor(0.07227554, shape=(), dtype=float32)
step 600: mean loss = tf.Tensor(0.07197886, shape=(), dtype=float32)
step 700: mean loss = tf.Tensor(0.07168947, shape=(), dtype=float32)
step 800: mean loss = tf.Tensor(0.071452394, shape=(), dtype=float32)
step 900: mean loss = tf.Tensor(0.071195945, shape=(), dtype=float32)
Start of epoch 2
step 0: mean loss = tf.Tensor(0.071118824, shape=(), dtype=float32)
step 100: mean loss = tf.Tensor(0.070942484, shape=(), dtype=float32)
step 200: mean loss = tf.Tensor(0.07081476, shape=(), dtype=float32)
step 300: mean loss = tf.Tensor(0.07066102, shape=(), dtype=float32)
step 400: mean loss = tf.Tensor(0.07056587, shape=(), dtype=float32)
step 500: mean loss = tf.Tensor(0.07042001, shape=(), dtype=float32)
step 600: mean loss = tf.Tensor(0.07030099, shape=(), dtype=float32)
step 700: mean loss = tf.Tensor(0.070176706, shape=(), dtype=float32)
step 800: mean loss = tf.Tensor(0.07007897, shape=(), dtype=float32)
step 900: mean loss = tf.Tensor(0.069954, shape=(), dtype=float32)

Заметьте что поскольку VAE сабклассирует Model, у него есть встроенные циклы обучения. Так что вы можете обучить его следующим образом:

vae = VariationalAutoEncoder(784, 64, 32)

optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)

vae.compile(optimizer, loss=tf.keras.losses.MeanSquaredError())
vae.fit(x_train, x_train, epochs=3, batch_size=64)
Epoch 1/3
938/938 [==============================] - 2s 2ms/step - loss: 0.0746
Epoch 2/3
938/938 [==============================] - 2s 2ms/step - loss: 0.0676
Epoch 3/3
938/938 [==============================] - 2s 2ms/step - loss: 0.0676

<tensorflow.python.keras.callbacks.History at 0x7f15b0752748>

Помимо объектно-ориентированной разработки: Functional API

Не слишком ли много в этом примере объектно-ориентированной разработки для вас? Вы также можете создавать модели, используя Functional API. Важно, что выбор одного стиля или другого не мешает вам сомещать компоненты, написанные в разных стилях: вы всегда можете сочетать и смешивать их.

Например, вариант с Functional API переиспользует тот же слой Sampling который мы определили в вышеприведенном примере.

original_dim = 784
intermediate_dim = 64
latent_dim = 32

# Определим модель энкодера.
original_inputs = tf.keras.Input(shape=(original_dim,), name='encoder_input')
x = layers.Dense(intermediate_dim, activation='relu')(original_inputs)
z_mean = layers.Dense(latent_dim, name='z_mean')(x)
z_log_var = layers.Dense(latent_dim, name='z_log_var')(x)
z = Sampling()((z_mean, z_log_var))
encoder = tf.keras.Model(inputs=original_inputs, outputs=z, name='encoder')

# Определим модель декодера.
latent_inputs = tf.keras.Input(shape=(latent_dim,), name='z_sampling')
x = layers.Dense(intermediate_dim, activation='relu')(latent_inputs)
outputs = layers.Dense(original_dim, activation='sigmoid')(x)
decoder = tf.keras.Model(inputs=latent_inputs, outputs=outputs, name='decoder')

# Определим модель VAE.
outputs = decoder(z)
vae = tf.keras.Model(inputs=original_inputs, outputs=outputs, name='vae')

# Добавим KL-дивергенцию потери регуляризации.
kl_loss = - 0.5 * tf.reduce_mean(
    z_log_var - tf.square(z_mean) - tf.exp(z_log_var) + 1)
vae.add_loss(kl_loss)

# Обучим.
optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)
vae.compile(optimizer, loss=tf.keras.losses.MeanSquaredError())
vae.fit(x_train, x_train, epochs=3, batch_size=64)
Epoch 1/3
938/938 [==============================] - 2s 2ms/step - loss: 0.0747
Epoch 2/3
938/938 [==============================] - 2s 2ms/step - loss: 0.0676
Epoch 3/3
938/938 [==============================] - 2s 2ms/step - loss: 0.0676

<tensorflow.python.keras.callbacks.History at 0x7f15b0574fd0>