Help protect the Great Barrier Reef with TensorFlow on Kaggle

# 通过子类化创建新的层和模型

## 设置

``````import tensorflow as tf
from tensorflow import keras
``````

## `Layer` 类：状态（权重）和部分计算的组合

Keras 的一个中心抽象是 `Layer` 类。层封装了状态（层的“权重”）和从输入到输出的转换（“调用”，即层的前向传递）。

``````class Linear(keras.layers.Layer):
def __init__(self, units=32, input_dim=32):
super(Linear, self).__init__()
w_init = tf.random_normal_initializer()
self.w = tf.Variable(
initial_value=w_init(shape=(input_dim, units), dtype="float32"),
trainable=True,
)
b_init = tf.zeros_initializer()
self.b = tf.Variable(
initial_value=b_init(shape=(units,), dtype="float32"), trainable=True
)

def call(self, inputs):
return tf.matmul(inputs, self.w) + self.b
``````

``````x = tf.ones((2, 2))
linear_layer = Linear(4, 2)
y = linear_layer(x)
print(y)
``````
```2021-08-13 19:58:32.393148: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-08-13 19:58:32.401011: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-08-13 19:58:32.401904: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-08-13 19:58:32.403509: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-08-13 19:58:32.404028: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-08-13 19:58:32.404900: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-08-13 19:58:32.405739: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-08-13 19:58:32.975828: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-08-13 19:58:32.976731: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-08-13 19:58:32.977545: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-08-13 19:58:32.978384: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1510] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 14648 MB memory:  -> device: 0, name: Tesla V100-SXM2-16GB, pci bus id: 0000:00:05.0, compute capability: 7.0
tf.Tensor(
[[ 0.04646244  0.18147472 -0.03977904 -0.01213008]
[ 0.04646244  0.18147472 -0.03977904 -0.01213008]], shape=(2, 4), dtype=float32)
```

``````assert linear_layer.weights == [linear_layer.w, linear_layer.b]
``````

``````class Linear(keras.layers.Layer):
def __init__(self, units=32, input_dim=32):
super(Linear, self).__init__()
shape=(input_dim, units), initializer="random_normal", trainable=True
)
self.b = self.add_weight(shape=(units,), initializer="zeros", trainable=True)

def call(self, inputs):
return tf.matmul(inputs, self.w) + self.b

x = tf.ones((2, 2))
linear_layer = Linear(4, 2)
y = linear_layer(x)
print(y)
``````
```tf.Tensor(
[[-0.03581849  0.09276912  0.03415143  0.02351041]
[-0.03581849  0.09276912  0.03415143  0.02351041]], shape=(2, 4), dtype=float32)
```

## 层可以具有不可训练权重

``````class ComputeSum(keras.layers.Layer):
def __init__(self, input_dim):
super(ComputeSum, self).__init__()
self.total = tf.Variable(initial_value=tf.zeros((input_dim,)), trainable=False)

def call(self, inputs):
return self.total

x = tf.ones((2, 2))
my_sum = ComputeSum(2)
y = my_sum(x)
print(y.numpy())
y = my_sum(x)
print(y.numpy())
``````
```[2. 2.]
[4. 4.]
```

``````print("weights:", len(my_sum.weights))
print("non-trainable weights:", len(my_sum.non_trainable_weights))

# It's not included in the trainable weights:
print("trainable_weights:", my_sum.trainable_weights)
``````
```weights: 1
non-trainable weights: 1
trainable_weights: []
```

## 最佳做法：将权重创建推迟到得知输入的形状之后

``````class Linear(keras.layers.Layer):
def __init__(self, units=32, input_dim=32):
super(Linear, self).__init__()
shape=(input_dim, units), initializer="random_normal", trainable=True
)
self.b = self.add_weight(shape=(units,), initializer="zeros", trainable=True)

def call(self, inputs):
return tf.matmul(inputs, self.w) + self.b
``````

``````class Linear(keras.layers.Layer):
def __init__(self, units=32):
super(Linear, self).__init__()
self.units = units

def build(self, input_shape):
shape=(input_shape[-1], self.units),
initializer="random_normal",
trainable=True,
)
shape=(self.units,), initializer="random_normal", trainable=True
)

def call(self, inputs):
return tf.matmul(inputs, self.w) + self.b
``````

``````# At instantiation, we don't know on what inputs this is going to get called
linear_layer = Linear(32)

# The layer's weights are created dynamically the first time the layer is called
y = linear_layer(x)
``````

## 层可递归组合

``````# Let's assume we are reusing the Linear class
# with a `build` method that we defined above.

class MLPBlock(keras.layers.Layer):
def __init__(self):
super(MLPBlock, self).__init__()
self.linear_1 = Linear(32)
self.linear_2 = Linear(32)
self.linear_3 = Linear(1)

def call(self, inputs):
x = self.linear_1(inputs)
x = tf.nn.relu(x)
x = self.linear_2(x)
x = tf.nn.relu(x)
return self.linear_3(x)

mlp = MLPBlock()
y = mlp(tf.ones(shape=(3, 64)))  # The first call to the `mlp` will create the weights
print("weights:", len(mlp.weights))
print("trainable weights:", len(mlp.trainable_weights))
``````
```weights: 6
trainable weights: 6
```

## `add_loss()` 方法

``````# A layer that creates an activity regularization loss
class ActivityRegularizationLayer(keras.layers.Layer):
def __init__(self, rate=1e-2):
super(ActivityRegularizationLayer, self).__init__()
self.rate = rate

def call(self, inputs):
return inputs
``````

``````class OuterLayer(keras.layers.Layer):
def __init__(self):
super(OuterLayer, self).__init__()
self.activity_reg = ActivityRegularizationLayer(1e-2)

def call(self, inputs):
return self.activity_reg(inputs)

layer = OuterLayer()
assert len(layer.losses) == 0  # No losses yet since the layer has never been called

_ = layer(tf.zeros(1, 1))
assert len(layer.losses) == 1  # We created one loss value

# `layer.losses` gets reset at the start of each __call__
_ = layer(tf.zeros(1, 1))
assert len(layer.losses) == 1  # This is the loss created during the call above
``````

``````class OuterLayerWithKernelRegularizer(keras.layers.Layer):
def __init__(self):
super(OuterLayerWithKernelRegularizer, self).__init__()
self.dense = keras.layers.Dense(
32, kernel_regularizer=tf.keras.regularizers.l2(1e-3)
)

def call(self, inputs):
return self.dense(inputs)

layer = OuterLayerWithKernelRegularizer()
_ = layer(tf.zeros((1, 1)))

# This is `1e-3 * sum(layer.dense.kernel ** 2)`,
# created by the `kernel_regularizer` above.
print(layer.losses)
``````
```[<tf.Tensor: shape=(), dtype=float32, numpy=0.0013762739>]
```

``````# Instantiate an optimizer.
optimizer = tf.keras.optimizers.SGD(learning_rate=1e-3)
loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)

# Iterate over the batches of a dataset.
for x_batch_train, y_batch_train in train_dataset:
with tf.GradientTape() as tape:
logits = layer(x_batch_train)  # Logits for this minibatch
# Loss value for this minibatch
loss_value = loss_fn(y_batch_train, logits)
# Add extra losses created during this forward pass:
loss_value += sum(model.losses)

``````

``````import numpy as np

inputs = keras.Input(shape=(3,))
outputs = ActivityRegularizationLayer()(inputs)
model = keras.Model(inputs, outputs)

# If there is a loss passed in `compile`, the regularization
# losses get added to it
model.fit(np.random.random((2, 3)), np.random.random((2, 3)))

# It's also possible not to pass any loss in `compile`,
# since the model already has a loss to minimize, via the `add_loss`
# call during the forward pass!
model.fit(np.random.random((2, 3)), np.random.random((2, 3)))
``````
```1/1 [==============================] - 0s 99ms/step - loss: 0.0991
1/1 [==============================] - 0s 42ms/step - loss: 0.0228
2021-08-13 19:58:33.977054: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:185] None of the MLIR Optimization Passes are enabled (registered 2)
<keras.callbacks.History at 0x7fcc643809d0>
```

## `add_metric()` 方法

`add_loss()` 类似，层还具有 `add_metric()` 方法，用于在训练过程中跟踪数量的移动平均值。

``````class LogisticEndpoint(keras.layers.Layer):
def __init__(self, name=None):
super(LogisticEndpoint, self).__init__(name=name)
self.loss_fn = keras.losses.BinaryCrossentropy(from_logits=True)
self.accuracy_fn = keras.metrics.BinaryAccuracy()

def call(self, targets, logits, sample_weights=None):
# Compute the training-time loss value and add it
# to the layer using `self.add_loss()`.
loss = self.loss_fn(targets, logits, sample_weights)

# Log accuracy as a metric and add it
# to the layer using `self.add_metric()`.
acc = self.accuracy_fn(targets, logits, sample_weights)

# Return the inference-time prediction tensor (for `.predict()`).
return tf.nn.softmax(logits)
``````

``````layer = LogisticEndpoint()

targets = tf.ones((2, 2))
logits = tf.ones((2, 2))
y = layer(targets, logits)

print("layer.metrics:", layer.metrics)
print("current accuracy value:", float(layer.metrics[0].result()))
``````
```layer.metrics: [<keras.metrics.BinaryAccuracy object at 0x7fcd1c589690>]
current accuracy value: 1.0
```

`add_loss()` 一样，这些指标也是通过 `fit()` 跟踪的：

``````inputs = keras.Input(shape=(3,), name="inputs")
targets = keras.Input(shape=(10,), name="targets")
logits = keras.layers.Dense(10)(inputs)
predictions = LogisticEndpoint(name="predictions")(logits, targets)

model = keras.Model(inputs=[inputs, targets], outputs=predictions)

data = {
"inputs": np.random.random((3, 3)),
"targets": np.random.random((3, 10)),
}
model.fit(data)
``````
```1/1 [==============================] - 0s 230ms/step - loss: 1.0306 - binary_accuracy: 0.0000e+00
<keras.callbacks.History at 0x7fcc6437cd10>
```

## 可选择在层上启用序列化

``````class Linear(keras.layers.Layer):
def __init__(self, units=32):
super(Linear, self).__init__()
self.units = units

def build(self, input_shape):
shape=(input_shape[-1], self.units),
initializer="random_normal",
trainable=True,
)
shape=(self.units,), initializer="random_normal", trainable=True
)

def call(self, inputs):
return tf.matmul(inputs, self.w) + self.b

def get_config(self):
return {"units": self.units}

# Now you can recreate the layer from its config:
layer = Linear(64)
config = layer.get_config()
print(config)
new_layer = Linear.from_config(config)
``````
```{'units': 64}
```

``````class Linear(keras.layers.Layer):
def __init__(self, units=32, **kwargs):
super(Linear, self).__init__(**kwargs)
self.units = units

def build(self, input_shape):
shape=(input_shape[-1], self.units),
initializer="random_normal",
trainable=True,
)
shape=(self.units,), initializer="random_normal", trainable=True
)

def call(self, inputs):
return tf.matmul(inputs, self.w) + self.b

def get_config(self):
config = super(Linear, self).get_config()
config.update({"units": self.units})
return config

layer = Linear(64)
config = layer.get_config()
print(config)
new_layer = Linear.from_config(config)
``````
```{'name': 'linear_8', 'trainable': True, 'dtype': 'float32', 'units': 64}
```

``````def from_config(cls, config):
return cls(**config)
``````

## `call()` 方法中的特权 `training` 参数

``````class CustomDropout(keras.layers.Layer):
def __init__(self, rate, **kwargs):
super(CustomDropout, self).__init__(**kwargs)
self.rate = rate

def call(self, inputs, training=None):
if training:
return tf.nn.dropout(inputs, rate=self.rate)
return inputs
``````

## `call()` 方法中的特权 `mask` 参数

`call()` 支持的另一个特权参数是 `mask` 参数。

## `Model` 类

`Model` 类具有与 `Layer` 相同的 API，但有如下区别：

• 它会公开内置训练、评估和预测循环（`model.fit()``model.evaluate()``model.predict()`）。
• 它会通过 `model.layers` 属性公开其内部层的列表。
• 它会公开保存和序列化 API（`save()``save_weights()`…）

``````class ResNet(tf.keras.Model):

def __init__(self, num_classes=1000):
super(ResNet, self).__init__()
self.block_1 = ResNetBlock()
self.block_2 = ResNetBlock()
self.global_pool = layers.GlobalAveragePooling2D()
self.classifier = Dense(num_classes)

def call(self, inputs):
x = self.block_1(inputs)
x = self.block_2(x)
x = self.global_pool(x)
return self.classifier(x)

resnet = ResNet()
dataset = ...
resnet.fit(dataset, epochs=10)
resnet.save(filepath)
``````

## 汇总：端到端示例

• `Layer` 封装了状态（在 `__init__()``build()` 中创建）和一些计算（在 `call()` 中定义）。
• 层可以递归嵌套以创建新的更大的计算块。
• 层可以通过 `add_loss()``add_metric()` 创建并跟踪损失（通常是正则化损失）以及指标。
• 您要训练的外部容器是 `Model``Model` 就像 `Layer`，但是添加了训练和序列化实用工具。

``````from tensorflow.keras import layers

class Sampling(layers.Layer):
"""Uses (z_mean, z_log_var) to sample z, the vector encoding a digit."""

def call(self, inputs):
z_mean, z_log_var = inputs
batch = tf.shape(z_mean)[0]
dim = tf.shape(z_mean)[1]
epsilon = tf.keras.backend.random_normal(shape=(batch, dim))
return z_mean + tf.exp(0.5 * z_log_var) * epsilon

class Encoder(layers.Layer):
"""Maps MNIST digits to a triplet (z_mean, z_log_var, z)."""

def __init__(self, latent_dim=32, intermediate_dim=64, name="encoder", **kwargs):
super(Encoder, self).__init__(name=name, **kwargs)
self.dense_proj = layers.Dense(intermediate_dim, activation="relu")
self.dense_mean = layers.Dense(latent_dim)
self.dense_log_var = layers.Dense(latent_dim)
self.sampling = Sampling()

def call(self, inputs):
x = self.dense_proj(inputs)
z_mean = self.dense_mean(x)
z_log_var = self.dense_log_var(x)
z = self.sampling((z_mean, z_log_var))
return z_mean, z_log_var, z

class Decoder(layers.Layer):
"""Converts z, the encoded digit vector, back into a readable digit."""

def __init__(self, original_dim, intermediate_dim=64, name="decoder", **kwargs):
super(Decoder, self).__init__(name=name, **kwargs)
self.dense_proj = layers.Dense(intermediate_dim, activation="relu")
self.dense_output = layers.Dense(original_dim, activation="sigmoid")

def call(self, inputs):
x = self.dense_proj(inputs)
return self.dense_output(x)

class VariationalAutoEncoder(keras.Model):
"""Combines the encoder and decoder into an end-to-end model for training."""

def __init__(
self,
original_dim,
intermediate_dim=64,
latent_dim=32,
name="autoencoder",
**kwargs
):
super(VariationalAutoEncoder, self).__init__(name=name, **kwargs)
self.original_dim = original_dim
self.encoder = Encoder(latent_dim=latent_dim, intermediate_dim=intermediate_dim)
self.decoder = Decoder(original_dim, intermediate_dim=intermediate_dim)

def call(self, inputs):
z_mean, z_log_var, z = self.encoder(inputs)
reconstructed = self.decoder(z)
# Add KL divergence regularization loss.
kl_loss = -0.5 * tf.reduce_mean(
z_log_var - tf.square(z_mean) - tf.exp(z_log_var) + 1
)
return reconstructed
``````

``````original_dim = 784
vae = VariationalAutoEncoder(original_dim, 64, 32)

mse_loss_fn = tf.keras.losses.MeanSquaredError()

loss_metric = tf.keras.metrics.Mean()

(x_train, _), _ = tf.keras.datasets.mnist.load_data()
x_train = x_train.reshape(60000, 784).astype("float32") / 255

train_dataset = tf.data.Dataset.from_tensor_slices(x_train)
train_dataset = train_dataset.shuffle(buffer_size=1024).batch(64)

epochs = 2

# Iterate over epochs.
for epoch in range(epochs):
print("Start of epoch %d" % (epoch,))

# Iterate over the batches of the dataset.
for step, x_batch_train in enumerate(train_dataset):
with tf.GradientTape() as tape:
reconstructed = vae(x_batch_train)
# Compute reconstruction loss
loss = mse_loss_fn(x_batch_train, reconstructed)
loss += sum(vae.losses)  # Add KLD regularization loss

loss_metric(loss)

if step % 100 == 0:
print("step %d: mean loss = %.4f" % (step, loss_metric.result()))
``````
```Start of epoch 0
step 0: mean loss = 0.3553
step 100: mean loss = 0.1263
step 200: mean loss = 0.0996
step 300: mean loss = 0.0894
step 400: mean loss = 0.0844
step 500: mean loss = 0.0810
step 600: mean loss = 0.0789
step 700: mean loss = 0.0772
step 800: mean loss = 0.0761
step 900: mean loss = 0.0750
Start of epoch 1
step 0: mean loss = 0.0747
step 100: mean loss = 0.0741
step 200: mean loss = 0.0736
step 300: mean loss = 0.0731
step 400: mean loss = 0.0728
step 500: mean loss = 0.0723
step 600: mean loss = 0.0720
step 700: mean loss = 0.0718
step 800: mean loss = 0.0715
step 900: mean loss = 0.0712
```

``````vae = VariationalAutoEncoder(784, 64, 32)

vae.compile(optimizer, loss=tf.keras.losses.MeanSquaredError())
vae.fit(x_train, x_train, epochs=2, batch_size=64)
``````
```Epoch 1/2
938/938 [==============================] - 3s 2ms/step - loss: 0.0749
Epoch 2/2
938/938 [==============================] - 2s 2ms/step - loss: 0.0676
<keras.callbacks.History at 0x7fcc641e6dd0>
```

## 超越面向对象的开发：函数式 API

``````original_dim = 784
intermediate_dim = 64
latent_dim = 32

# Define encoder model.
original_inputs = tf.keras.Input(shape=(original_dim,), name="encoder_input")
x = layers.Dense(intermediate_dim, activation="relu")(original_inputs)
z_mean = layers.Dense(latent_dim, name="z_mean")(x)
z_log_var = layers.Dense(latent_dim, name="z_log_var")(x)
z = Sampling()((z_mean, z_log_var))
encoder = tf.keras.Model(inputs=original_inputs, outputs=z, name="encoder")

# Define decoder model.
latent_inputs = tf.keras.Input(shape=(latent_dim,), name="z_sampling")
x = layers.Dense(intermediate_dim, activation="relu")(latent_inputs)
outputs = layers.Dense(original_dim, activation="sigmoid")(x)
decoder = tf.keras.Model(inputs=latent_inputs, outputs=outputs, name="decoder")

# Define VAE model.
outputs = decoder(z)
vae = tf.keras.Model(inputs=original_inputs, outputs=outputs, name="vae")

# Add KL divergence regularization loss.
kl_loss = -0.5 * tf.reduce_mean(z_log_var - tf.square(z_mean) - tf.exp(z_log_var) + 1)

# Train.
vae.compile(optimizer, loss=tf.keras.losses.MeanSquaredError())
vae.fit(x_train, x_train, epochs=3, batch_size=64)
``````
```Epoch 1/3
938/938 [==============================] - 3s 2ms/step - loss: 0.0745
Epoch 2/3
938/938 [==============================] - 2s 2ms/step - loss: 0.0677
Epoch 3/3
938/938 [==============================] - 2s 2ms/step - loss: 0.0675
<keras.callbacks.History at 0x7fc8935f78d0>
```

[{ "type": "thumb-down", "id": "missingTheInformationINeed", "label":"没有我需要的信息" },{ "type": "thumb-down", "id": "tooComplicatedTooManySteps", "label":"太复杂/步骤太多" },{ "type": "thumb-down", "id": "outOfDate", "label":"内容需要更新" },{ "type": "thumb-down", "id": "translationIssue", "label":"翻译问题" },{ "type": "thumb-down", "id": "samplesCodeIssue", "label":"Samples / code issue" },{ "type": "thumb-down", "id": "otherDown", "label":"其他" }]
[{ "type": "thumb-up", "id": "easyToUnderstand", "label":"易于理解" },{ "type": "thumb-up", "id": "solvedMyProblem", "label":"解决了我的问题" },{ "type": "thumb-up", "id": "otherUp", "label":"其他" }]