ML Community Day คือวันที่ 9 พฤศจิกายน! ร่วมกับเราสำหรับการปรับปรุงจาก TensorFlow, JAX และอื่น ๆ เรียนรู้เพิ่มเติม

การสร้างเลเยอร์และโมเดลใหม่ผ่านคลาสย่อย

ดูบน TensorFlow.org ทำงานใน Google Colab ดูแหล่งที่มาบน GitHub ดาวน์โหลดโน๊ตบุ๊ค

ติดตั้ง

import tensorflow as tf
from tensorflow import keras

Layer ชั้น: การรวมกันของรัฐ (น้ำหนัก) และการคำนวณบาง

หนึ่งในสิ่งที่เป็นนามธรรมกลางใน Keras เป็น Layer ชั้น เลเยอร์ห่อหุ้มทั้งสถานะ ("น้ำหนัก" ของเลเยอร์) และการแปลงจากอินพุตเป็นเอาต์พุต ("การเรียก" การส่งต่อของเลเยอร์)

นี่คือเลเยอร์ที่เชื่อมต่อกันอย่างหนาแน่น แต่ก็มีรัฐ: ตัวแปร w และ b

class Linear(keras.layers.Layer):
    def __init__(self, units=32, input_dim=32):
        super(Linear, self).__init__()
        w_init = tf.random_normal_initializer()
        self.w = tf.Variable(
            initial_value=w_init(shape=(input_dim, units), dtype="float32"),
            trainable=True,
        )
        b_init = tf.zeros_initializer()
        self.b = tf.Variable(
            initial_value=b_init(shape=(units,), dtype="float32"), trainable=True
        )

    def call(self, inputs):
        return tf.matmul(inputs, self.w) + self.b

คุณจะใช้เลเยอร์โดยเรียกใช้มันบนอินพุตเทนเซอร์ เหมือนกับฟังก์ชัน Python

x = tf.ones((2, 2))
linear_layer = Linear(4, 2)
y = linear_layer(x)
print(y)
tf.Tensor(
[[ 0.03029768 -0.05972501  0.00586849 -0.1109921 ]
 [ 0.03029768 -0.05972501  0.00586849 -0.1109921 ]], shape=(2, 4), dtype=float32)

โปรดทราบว่าน้ำหนัก w และ b มีการติดตามโดยอัตโนมัติโดยชั้นบนเป็นชุดแอตทริบิวต์ชั้น:

assert linear_layer.weights == [linear_layer.w, linear_layer.b]

หมายเหตุคุณยังมีการเข้าถึงทางลัดที่รวดเร็วสำหรับการเพิ่มน้ำหนักให้กับชั้นที่: add_weight() วิธีการ:

class Linear(keras.layers.Layer):
    def __init__(self, units=32, input_dim=32):
        super(Linear, self).__init__()
        self.w = self.add_weight(
            shape=(input_dim, units), initializer="random_normal", trainable=True
        )
        self.b = self.add_weight(shape=(units,), initializer="zeros", trainable=True)

    def call(self, inputs):
        return tf.matmul(inputs, self.w) + self.b


x = tf.ones((2, 2))
linear_layer = Linear(4, 2)
y = linear_layer(x)
print(y)
tf.Tensor(
[[ 0.05004499  0.01081884 -0.12212791  0.01023131]
 [ 0.05004499  0.01081884 -0.12212791  0.01023131]], shape=(2, 4), dtype=float32)

เลเยอร์สามารถมีน้ำหนักที่ไม่สามารถฝึกได้

นอกจากตุ้มน้ำหนักที่ฝึกได้ คุณสามารถเพิ่มตุ้มน้ำหนักที่ไม่สามารถฝึกได้ลงในเลเยอร์ได้เช่นกัน น้ำหนักดังกล่าวไม่ควรนำมาพิจารณาในระหว่างการขยายพันธุ์ด้านหลัง เมื่อคุณฝึกเลเยอร์

วิธีเพิ่มและใช้ตุ้มน้ำหนักที่ไม่สามารถฝึกได้มีดังนี้

class ComputeSum(keras.layers.Layer):
    def __init__(self, input_dim):
        super(ComputeSum, self).__init__()
        self.total = tf.Variable(initial_value=tf.zeros((input_dim,)), trainable=False)

    def call(self, inputs):
        self.total.assign_add(tf.reduce_sum(inputs, axis=0))
        return self.total


x = tf.ones((2, 2))
my_sum = ComputeSum(2)
y = my_sum(x)
print(y.numpy())
y = my_sum(x)
print(y.numpy())
[2. 2.]
[4. 4.]

มันเป็นส่วนหนึ่งของ layer.weights แต่จะได้รับการจัดหมวดหมู่เป็นน้ำหนักที่ไม่ใช่สุวินัย:

print("weights:", len(my_sum.weights))
print("non-trainable weights:", len(my_sum.non_trainable_weights))

# It's not included in the trainable weights:
print("trainable_weights:", my_sum.trainable_weights)
weights: 1
non-trainable weights: 1
trainable_weights: []

แนวปฏิบัติที่ดีที่สุด: ชะลอการสร้างน้ำหนักจนกว่าจะทราบรูปร่างของปัจจัยการผลิต

ของเรา Linear ชั้นข้างต้นเอา input_dim อาร์กิวเมนต์ที่ใช้ในการคำนวณรูปทรงของน้ำหนักที่ w และ b ใน __init__() :

class Linear(keras.layers.Layer):
    def __init__(self, units=32, input_dim=32):
        super(Linear, self).__init__()
        self.w = self.add_weight(
            shape=(input_dim, units), initializer="random_normal", trainable=True
        )
        self.b = self.add_weight(shape=(units,), initializer="zeros", trainable=True)

    def call(self, inputs):
        return tf.matmul(inputs, self.w) + self.b

ในหลายกรณี คุณอาจไม่ทราบขนาดอินพุตล่วงหน้า และคุณต้องการสร้างน้ำหนักอย่างเกียจคร้านเมื่อทราบค่านั้น หลังจากสร้างเลเยอร์แล้ว

ใน Keras API เราขอแนะนำให้สร้างน้ำหนักชั้นใน build(self, inputs_shape) วิธีการของชั้นของคุณ แบบนี้:

class Linear(keras.layers.Layer):
    def __init__(self, units=32):
        super(Linear, self).__init__()
        self.units = units

    def build(self, input_shape):
        self.w = self.add_weight(
            shape=(input_shape[-1], self.units),
            initializer="random_normal",
            trainable=True,
        )
        self.b = self.add_weight(
            shape=(self.units,), initializer="random_normal", trainable=True
        )

    def call(self, inputs):
        return tf.matmul(inputs, self.w) + self.b

__call__() วิธีการของชั้นของคุณจะทำงานสร้างโดยอัตโนมัติเป็นครั้งแรกมันถูกเรียกว่า ตอนนี้คุณมีเลเยอร์ที่ขี้เกียจและใช้งานง่ายขึ้น:

# At instantiation, we don't know on what inputs this is going to get called
linear_layer = Linear(32)

# The layer's weights are created dynamically the first time the layer is called
y = linear_layer(x)

เลเยอร์สามารถเขียนซ้ำได้

หากคุณกำหนดอินสแตนซ์ของเลเยอร์เป็นแอตทริบิวต์ของเลเยอร์อื่น เลเยอร์ภายนอกจะเริ่มติดตามน้ำหนักของเลเยอร์ภายใน

เราขอแนะนำให้สร้าง sublayers ดังกล่าวใน __init__() วิธีการ (ตั้งแต่ sublayers มักจะมีวิธีการสร้างที่พวกเขาจะถูกสร้างขึ้นเมื่อชั้นนอกได้รับการสร้าง)

# Let's assume we are reusing the Linear class
# with a `build` method that we defined above.


class MLPBlock(keras.layers.Layer):
    def __init__(self):
        super(MLPBlock, self).__init__()
        self.linear_1 = Linear(32)
        self.linear_2 = Linear(32)
        self.linear_3 = Linear(1)

    def call(self, inputs):
        x = self.linear_1(inputs)
        x = tf.nn.relu(x)
        x = self.linear_2(x)
        x = tf.nn.relu(x)
        return self.linear_3(x)


mlp = MLPBlock()
y = mlp(tf.ones(shape=(3, 64)))  # The first call to the `mlp` will create the weights
print("weights:", len(mlp.weights))
print("trainable weights:", len(mlp.trainable_weights))
weights: 6
trainable weights: 6

add_loss() วิธีการ

เมื่อเขียน call() วิธีการของชั้นที่คุณสามารถสร้างเทนเซอร์สูญเสียที่คุณจะต้องการที่จะใช้ในภายหลังเมื่อเขียนห่วงฝึกอบรมของคุณ นี้เป็น doable โดยการเรียก self.add_loss(value) :

# A layer that creates an activity regularization loss
class ActivityRegularizationLayer(keras.layers.Layer):
    def __init__(self, rate=1e-2):
        super(ActivityRegularizationLayer, self).__init__()
        self.rate = rate

    def call(self, inputs):
        self.add_loss(self.rate * tf.reduce_sum(inputs))
        return inputs

สูญเสียเหล่านี้ (รวมทั้งที่สร้างขึ้นโดยชั้นใด ๆ ) สามารถเรียกดูได้ผ่านทาง layer.losses สถานที่แห่งนี้ถูกตั้งค่าใหม่ที่เริ่มต้นของทุก __call__() ชั้นระดับบนสุดเพื่อให้ layer.losses เสมอมีค่าการสูญเสียที่เกิดขึ้นระหว่างไปข้างหน้าผ่านที่ผ่านมา

class OuterLayer(keras.layers.Layer):
    def __init__(self):
        super(OuterLayer, self).__init__()
        self.activity_reg = ActivityRegularizationLayer(1e-2)

    def call(self, inputs):
        return self.activity_reg(inputs)


layer = OuterLayer()
assert len(layer.losses) == 0  # No losses yet since the layer has never been called

_ = layer(tf.zeros(1, 1))
assert len(layer.losses) == 1  # We created one loss value

# `layer.losses` gets reset at the start of each __call__
_ = layer(tf.zeros(1, 1))
assert len(layer.losses) == 1  # This is the loss created during the call above

นอกจากนี้ loss ทรัพย์สินนอกจากนี้ยังมีการสูญเสียกูสร้างขึ้นสำหรับน้ำหนักของชั้นใด ๆ :

class OuterLayerWithKernelRegularizer(keras.layers.Layer):
    def __init__(self):
        super(OuterLayerWithKernelRegularizer, self).__init__()
        self.dense = keras.layers.Dense(
            32, kernel_regularizer=tf.keras.regularizers.l2(1e-3)
        )

    def call(self, inputs):
        return self.dense(inputs)


layer = OuterLayerWithKernelRegularizer()
_ = layer(tf.zeros((1, 1)))

# This is `1e-3 * sum(layer.dense.kernel ** 2)`,
# created by the `kernel_regularizer` above.
print(layer.losses)
[<tf.Tensor: shape=(), dtype=float32, numpy=0.0016696099>]

การสูญเสียเหล่านี้มีขึ้นเพื่อนำมาพิจารณาเมื่อเขียนลูปการฝึกเช่นนี้:

# Instantiate an optimizer.
optimizer = tf.keras.optimizers.SGD(learning_rate=1e-3)
loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)

# Iterate over the batches of a dataset.
for x_batch_train, y_batch_train in train_dataset:
  with tf.GradientTape() as tape:
    logits = layer(x_batch_train)  # Logits for this minibatch
    # Loss value for this minibatch
    loss_value = loss_fn(y_batch_train, logits)
    # Add extra losses created during this forward pass:
    loss_value += sum(model.losses)

  grads = tape.gradient(loss_value, model.trainable_weights)
  optimizer.apply_gradients(zip(grads, model.trainable_weights))

เพื่อดูรายละเอียดเกี่ยวกับการเขียนลูปการฝึกอบรมดู คู่มือการเขียนห่วงการฝึกอบรมจากรอยขีดข่วน

ความสูญเสียเหล่านี้ยังทำงานต่อเนื่องกับ fit() (พวกเขาได้รับการสรุปโดยอัตโนมัติและเพิ่มการสูญเสียหลักถ้ามี):

import numpy as np

inputs = keras.Input(shape=(3,))
outputs = ActivityRegularizationLayer()(inputs)
model = keras.Model(inputs, outputs)

# If there is a loss passed in `compile`, the regularization
# losses get added to it
model.compile(optimizer="adam", loss="mse")
model.fit(np.random.random((2, 3)), np.random.random((2, 3)))

# It's also possible not to pass any loss in `compile`,
# since the model already has a loss to minimize, via the `add_loss`
# call during the forward pass!
model.compile(optimizer="adam")
model.fit(np.random.random((2, 3)), np.random.random((2, 3)))
1/1 [==============================] - 0s 103ms/step - loss: 0.3750
1/1 [==============================] - 0s 44ms/step - loss: 0.0230
<keras.callbacks.History at 0x7fd0f80b3290>

add_metric() วิธีการ

ในทำนองเดียวกันกับ add_loss() ชั้นยังมี add_metric() วิธีการสำหรับการติดตามค่าเฉลี่ยเคลื่อนที่ของปริมาณระหว่างการฝึกอบรม

พิจารณาเลเยอร์ต่อไปนี้: เลเยอร์ "logistic endpoint" มันต้องใช้เวลาเป็นปัจจัยการผลิตการคาดการณ์และเป้าหมายก็คำนวณการสูญเสียที่จะติดตามผ่านทาง add_loss() และคำนวณเกลาความถูกต้องซึ่งจะติดตามผ่าน add_metric()

class LogisticEndpoint(keras.layers.Layer):
    def __init__(self, name=None):
        super(LogisticEndpoint, self).__init__(name=name)
        self.loss_fn = keras.losses.BinaryCrossentropy(from_logits=True)
        self.accuracy_fn = keras.metrics.BinaryAccuracy()

    def call(self, targets, logits, sample_weights=None):
        # Compute the training-time loss value and add it
        # to the layer using `self.add_loss()`.
        loss = self.loss_fn(targets, logits, sample_weights)
        self.add_loss(loss)

        # Log accuracy as a metric and add it
        # to the layer using `self.add_metric()`.
        acc = self.accuracy_fn(targets, logits, sample_weights)
        self.add_metric(acc, name="accuracy")

        # Return the inference-time prediction tensor (for `.predict()`).
        return tf.nn.softmax(logits)

ตัวชี้วัดการติดตามในลักษณะนี้สามารถเข้าถึงได้ผ่าน layer.metrics :

layer = LogisticEndpoint()

targets = tf.ones((2, 2))
logits = tf.ones((2, 2))
y = layer(targets, logits)

print("layer.metrics:", layer.metrics)
print("current accuracy value:", float(layer.metrics[0].result()))
layer.metrics: [<keras.metrics.BinaryAccuracy object at 0x7fd1b0214810>]
current accuracy value: 1.0

เช่นเดียวสำหรับ add_loss() ตัวชี้วัดเหล่านี้จะถูกติดตามโดย fit() :

inputs = keras.Input(shape=(3,), name="inputs")
targets = keras.Input(shape=(10,), name="targets")
logits = keras.layers.Dense(10)(inputs)
predictions = LogisticEndpoint(name="predictions")(logits, targets)

model = keras.Model(inputs=[inputs, targets], outputs=predictions)
model.compile(optimizer="adam")

data = {
    "inputs": np.random.random((3, 3)),
    "targets": np.random.random((3, 10)),
}
model.fit(data)
1/1 [==============================] - 0s 242ms/step - loss: 0.9954 - binary_accuracy: 0.0000e+00
<keras.callbacks.History at 0x7fd0f80b3110>

คุณสามารถเลือกเปิดใช้งานการทำให้เป็นอนุกรมบนเลเยอร์ของคุณ

หากคุณต้องการชั้นที่กำหนดเองของคุณจะ serializable เป็นส่วนหนึ่งของ รูปแบบการทำงาน คุณสามารถเลือกใช้ get_config() วิธีการ:

class Linear(keras.layers.Layer):
    def __init__(self, units=32):
        super(Linear, self).__init__()
        self.units = units

    def build(self, input_shape):
        self.w = self.add_weight(
            shape=(input_shape[-1], self.units),
            initializer="random_normal",
            trainable=True,
        )
        self.b = self.add_weight(
            shape=(self.units,), initializer="random_normal", trainable=True
        )

    def call(self, inputs):
        return tf.matmul(inputs, self.w) + self.b

    def get_config(self):
        return {"units": self.units}


# Now you can recreate the layer from its config:
layer = Linear(64)
config = layer.get_config()
print(config)
new_layer = Linear.from_config(config)
{'units': 64}

หมายเหตุว่า __init__() วิธีการของฐาน Layer ชั้นใช้อาร์กิวเมนต์บางคำในโดยเฉพาะ name และ dtype มันเป็นวิธีที่ดีที่จะผ่านการขัดแย้งเหล่านี้ไปยังผู้ปกครองชั้นใน __init__() และเพื่อรวมไว้ในการตั้งค่าชั้น:

class Linear(keras.layers.Layer):
    def __init__(self, units=32, **kwargs):
        super(Linear, self).__init__(**kwargs)
        self.units = units

    def build(self, input_shape):
        self.w = self.add_weight(
            shape=(input_shape[-1], self.units),
            initializer="random_normal",
            trainable=True,
        )
        self.b = self.add_weight(
            shape=(self.units,), initializer="random_normal", trainable=True
        )

    def call(self, inputs):
        return tf.matmul(inputs, self.w) + self.b

    def get_config(self):
        config = super(Linear, self).get_config()
        config.update({"units": self.units})
        return config


layer = Linear(64)
config = layer.get_config()
print(config)
new_layer = Linear.from_config(config)
{'name': 'linear_8', 'trainable': True, 'dtype': 'float32', 'units': 64}

หากคุณต้องการความยืดหยุ่นมากขึ้นเมื่อ deserializing ชั้นจากการตั้งค่าของคุณยังสามารถแทนที่ from_config() วิธีการเรียน นี่คือการดำเนินงานที่ฐานของ from_config() :

def from_config(cls, config):
  return cls(**config)

ต้องการเรียนรู้เพิ่มเติมเกี่ยวกับอนุกรมและบันทึกให้ดูสมบูรณ์ คู่มือเพื่อการออมและการ serializing รุ่น

ได้รับการยกเว้น training การโต้แย้งใน call() วิธีการ

ชั้นบางโดยเฉพาะใน BatchNormalization ชั้นและ Dropout ชั้นมีพฤติกรรมที่แตกต่างกันระหว่างการฝึกอบรมและการอนุมาน สำหรับชั้นดังกล่าวเป็นมาตรฐานการปฏิบัติจะเปิดเผย training (บูล) อาร์กิวเมนต์ใน call() วิธีการ

โดยการเปิดเผยเรื่องนี้ในการ call() , คุณเปิดใช้งานตัวในการฝึกอบรมและการประเมินผลลูป (เช่น fit() ) อย่างถูกต้องใช้ชั้นในการฝึกอบรมและการอนุมาน

class CustomDropout(keras.layers.Layer):
    def __init__(self, rate, **kwargs):
        super(CustomDropout, self).__init__(**kwargs)
        self.rate = rate

    def call(self, inputs, training=None):
        if training:
            return tf.nn.dropout(inputs, rate=self.rate)
        return inputs

ได้รับการยกเว้น mask โต้แย้งใน call() วิธีการ

อาร์กิวเมนต์ที่มีสิทธิพิเศษอื่น ๆ ได้รับการสนับสนุนโดยการ call() เป็น mask โต้แย้ง

คุณจะพบมันในเลเยอร์ Keras RNN ทั้งหมด มาสก์คือเทนเซอร์บูลีน (ค่าบูลีนหนึ่งค่าต่อไทม์สเต็ปในอินพุต) ใช้เพื่อข้ามขั้นตอนเวลาอินพุตบางรายการเมื่อประมวลผลข้อมูลอนุกรมเวลา

Keras จะผ่านการถูกต้องโดยอัตโนมัติ mask อาร์กิวเมนต์ __call__() สำหรับชั้นที่สนับสนุนมันเมื่อหน้ากากถูกสร้างขึ้นโดยชั้นก่อน ชั้นหน้ากากสร้างเป็น Embedding ชั้นกำหนดค่าด้วย mask_zero=True และ Masking ชั้น

ต้องการเรียนรู้เพิ่มเติมเกี่ยวกับการหลอกลวงและวิธีการเขียนชั้นกาวที่เปิดใช้งานโปรดตรวจสอบคู่มือ "ทำความเข้าใจ padding และกำบัง"

Model ระดับ

โดยทั่วไปแล้วคุณจะใช้ Layer ชั้นจะกำหนดบล็อกการคำนวณภายในและจะใช้ Model ระดับในการกำหนดรูปแบบนอก - วัตถุที่คุณจะฝึก

ยกตัวอย่างเช่นในรูปแบบ ResNet50 คุณจะมีบล็อก RESNET หลาย subclassing Layer และเดี่ยว Model ครอบคลุมเครือข่าย ResNet50 ทั้งหมด

Model ระดับมี API เช่นเดียวกับ Layer มีความแตกต่างต่อไปนี้:

  • มันเสี่ยงตัวในการฝึกอบรมและการประเมินผลและการทำนายลูป ( model.fit() , model.evaluate() , model.predict() )
  • มันหมายความว่ารายชื่อของชั้นด้านของตนผ่านทาง model.layers คุณสมบัติ
  • มันหมายความว่าประหยัดและ API อนุกรม ( save() , save_weights() ... )

ได้อย่างมีประสิทธิภาพที่ Layer ระดับที่สอดคล้องกับสิ่งที่เราอ้างถึงในวรรณคดีเป็น "ชั้น" (ในขณะที่ "บิดชั้น" หรือ "ชั้นกำเริบ") หรือเป็น "บล็อก" (ในขณะที่ "RESNET บล็อก" หรือ "บล็อก Inception" ).

ในขณะที่ Model ระดับสอดคล้องกับสิ่งที่เรียกว่าในวรรณคดีเป็น "รูปแบบ" (ในขณะที่ "รูปแบบการเรียนรู้ลึก") หรือเป็น "เครือข่าย" (ในขณะที่ "เครือข่ายประสาทลึก")

ดังนั้นหากคุณกำลังสงสัยว่า "ฉันควรจะใช้ Layer ชั้นหรือ Model คลาส?" ถามตัวเอง: ฉันจะต้องโทร fit() มันได้หรือไม่ ฉันจะต้องเรียก save() มันได้หรือไม่ ถ้าเป็นเช่นนั้นไปกับ Model ถ้าไม่ได้ (เพราะทั้งชั้นเรียนของคุณเป็นเพียงบล็อกในระบบที่ใหญ่กว่าหรือเพราะคุณเขียนการฝึกอบรมและการบันทึกรหัสตัวเอง) ใช้ Layer

ตัวอย่างเช่นเราอาจจะใช้ตัวอย่างเช่นมินิ RESNET ข้างต้นของเราและใช้มันเพื่อสร้าง Model ที่เราจะได้ฝึกกับ fit() และที่เราจะสามารถประหยัดกับ save_weights() :

class ResNet(tf.keras.Model):

    def __init__(self, num_classes=1000):
        super(ResNet, self).__init__()
        self.block_1 = ResNetBlock()
        self.block_2 = ResNetBlock()
        self.global_pool = layers.GlobalAveragePooling2D()
        self.classifier = Dense(num_classes)

    def call(self, inputs):
        x = self.block_1(inputs)
        x = self.block_2(x)
        x = self.global_pool(x)
        return self.classifier(x)


resnet = ResNet()
dataset = ...
resnet.fit(dataset, epochs=10)
resnet.save(filepath)

นำทุกอย่างมารวมกัน: ตัวอย่างจากต้นทางถึงปลายทาง

นี่คือสิ่งที่คุณได้เรียนรู้ไปแล้ว:

  • Layer แค็ปซูลรัฐ (ที่สร้างขึ้นใน __init__() หรือ build() ) และการคำนวณบางคน (ที่กำหนดไว้ใน call() )
  • สามารถซ้อนเลเยอร์ซ้ำๆ เพื่อสร้างบล็อกการคำนวณใหม่ที่ใหญ่ขึ้นได้
  • ชั้นสามารถสร้างและการสูญเสียการติดตาม (โดยทั่วไปแล้วการสูญเสียกู) เช่นเดียวกับตัวชี้วัดที่ผ่าน add_loss() และ add_metric()
  • ภาชนะนอกสิ่งที่คุณต้องการในการฝึกอบรมเป็น Model Model เป็นเช่นเดียวกับ Layer แต่ด้วยการฝึกอบรมเพิ่มและสาธารณูปโภคอนุกรม

มารวมสิ่งเหล่านี้เข้าด้วยกันในตัวอย่างแบบ end-to-end: เราจะใช้ Variational AutoEncoder (VAE) เราจะฝึกตามหลัก MNIST

VAE ของเราจะเป็น subclass ของ Model ที่สร้างขึ้นเป็นองค์ประกอบที่ซ้อนกันของชั้นที่ subclass Layer มันจะมีการสูญเสียการทำให้เป็นมาตรฐาน (KL divergence)

from tensorflow.keras import layers


class Sampling(layers.Layer):
    """Uses (z_mean, z_log_var) to sample z, the vector encoding a digit."""

    def call(self, inputs):
        z_mean, z_log_var = inputs
        batch = tf.shape(z_mean)[0]
        dim = tf.shape(z_mean)[1]
        epsilon = tf.keras.backend.random_normal(shape=(batch, dim))
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon


class Encoder(layers.Layer):
    """Maps MNIST digits to a triplet (z_mean, z_log_var, z)."""

    def __init__(self, latent_dim=32, intermediate_dim=64, name="encoder", **kwargs):
        super(Encoder, self).__init__(name=name, **kwargs)
        self.dense_proj = layers.Dense(intermediate_dim, activation="relu")
        self.dense_mean = layers.Dense(latent_dim)
        self.dense_log_var = layers.Dense(latent_dim)
        self.sampling = Sampling()

    def call(self, inputs):
        x = self.dense_proj(inputs)
        z_mean = self.dense_mean(x)
        z_log_var = self.dense_log_var(x)
        z = self.sampling((z_mean, z_log_var))
        return z_mean, z_log_var, z


class Decoder(layers.Layer):
    """Converts z, the encoded digit vector, back into a readable digit."""

    def __init__(self, original_dim, intermediate_dim=64, name="decoder", **kwargs):
        super(Decoder, self).__init__(name=name, **kwargs)
        self.dense_proj = layers.Dense(intermediate_dim, activation="relu")
        self.dense_output = layers.Dense(original_dim, activation="sigmoid")

    def call(self, inputs):
        x = self.dense_proj(inputs)
        return self.dense_output(x)


class VariationalAutoEncoder(keras.Model):
    """Combines the encoder and decoder into an end-to-end model for training."""

    def __init__(
        self,
        original_dim,
        intermediate_dim=64,
        latent_dim=32,
        name="autoencoder",
        **kwargs
    ):
        super(VariationalAutoEncoder, self).__init__(name=name, **kwargs)
        self.original_dim = original_dim
        self.encoder = Encoder(latent_dim=latent_dim, intermediate_dim=intermediate_dim)
        self.decoder = Decoder(original_dim, intermediate_dim=intermediate_dim)

    def call(self, inputs):
        z_mean, z_log_var, z = self.encoder(inputs)
        reconstructed = self.decoder(z)
        # Add KL divergence regularization loss.
        kl_loss = -0.5 * tf.reduce_mean(
            z_log_var - tf.square(z_mean) - tf.exp(z_log_var) + 1
        )
        self.add_loss(kl_loss)
        return reconstructed

มาเขียนวงจรการฝึกอย่างง่ายใน MNIST:

original_dim = 784
vae = VariationalAutoEncoder(original_dim, 64, 32)

optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)
mse_loss_fn = tf.keras.losses.MeanSquaredError()

loss_metric = tf.keras.metrics.Mean()

(x_train, _), _ = tf.keras.datasets.mnist.load_data()
x_train = x_train.reshape(60000, 784).astype("float32") / 255

train_dataset = tf.data.Dataset.from_tensor_slices(x_train)
train_dataset = train_dataset.shuffle(buffer_size=1024).batch(64)

epochs = 2

# Iterate over epochs.
for epoch in range(epochs):
    print("Start of epoch %d" % (epoch,))

    # Iterate over the batches of the dataset.
    for step, x_batch_train in enumerate(train_dataset):
        with tf.GradientTape() as tape:
            reconstructed = vae(x_batch_train)
            # Compute reconstruction loss
            loss = mse_loss_fn(x_batch_train, reconstructed)
            loss += sum(vae.losses)  # Add KLD regularization loss

        grads = tape.gradient(loss, vae.trainable_weights)
        optimizer.apply_gradients(zip(grads, vae.trainable_weights))

        loss_metric(loss)

        if step % 100 == 0:
            print("step %d: mean loss = %.4f" % (step, loss_metric.result()))
Start of epoch 0
step 0: mean loss = 0.3729
step 100: mean loss = 0.1265
step 200: mean loss = 0.0996
step 300: mean loss = 0.0895
step 400: mean loss = 0.0844
step 500: mean loss = 0.0810
step 600: mean loss = 0.0789
step 700: mean loss = 0.0773
step 800: mean loss = 0.0761
step 900: mean loss = 0.0751
Start of epoch 1
step 0: mean loss = 0.0748
step 100: mean loss = 0.0741
step 200: mean loss = 0.0736
step 300: mean loss = 0.0731
step 400: mean loss = 0.0728
step 500: mean loss = 0.0724
step 600: mean loss = 0.0721
step 700: mean loss = 0.0718
step 800: mean loss = 0.0715
step 900: mean loss = 0.0713

หมายเหตุว่าตั้งแต่ VAE เป็น subclassing Model มันมีในตัวลูปการฝึกอบรม ดังนั้นคุณสามารถฝึกได้ดังนี้:

vae = VariationalAutoEncoder(784, 64, 32)

optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)

vae.compile(optimizer, loss=tf.keras.losses.MeanSquaredError())
vae.fit(x_train, x_train, epochs=2, batch_size=64)
Epoch 1/2
938/938 [==============================] - 3s 3ms/step - loss: 0.0747
Epoch 2/2
938/938 [==============================] - 2s 3ms/step - loss: 0.0676
<keras.callbacks.History at 0x7fcd2ad6bed0>

นอกเหนือจากการพัฒนาเชิงวัตถุ: Functional API

ตัวอย่างนี้มีการพัฒนาเชิงวัตถุมากเกินไปสำหรับคุณหรือไม่? นอกจากนี้คุณยังสามารถสร้างแบบจำลองโดยใช้ API ของฟังก์ชั่น ที่สำคัญ การเลือกรูปแบบใดรูปแบบหนึ่งไม่ได้ป้องกันคุณจากการใช้ประโยชน์จากส่วนประกอบที่เขียนในรูปแบบอื่น: คุณสามารถมิกซ์แอนด์แมทช์ได้เสมอ

ยกตัวอย่างเช่นตัวอย่างเช่น API ฟังก์ชั่นด้านล่าง reuses เดียวกัน Sampling ชั้นที่เรากำหนดไว้ในตัวอย่างข้างต้น:

original_dim = 784
intermediate_dim = 64
latent_dim = 32

# Define encoder model.
original_inputs = tf.keras.Input(shape=(original_dim,), name="encoder_input")
x = layers.Dense(intermediate_dim, activation="relu")(original_inputs)
z_mean = layers.Dense(latent_dim, name="z_mean")(x)
z_log_var = layers.Dense(latent_dim, name="z_log_var")(x)
z = Sampling()((z_mean, z_log_var))
encoder = tf.keras.Model(inputs=original_inputs, outputs=z, name="encoder")

# Define decoder model.
latent_inputs = tf.keras.Input(shape=(latent_dim,), name="z_sampling")
x = layers.Dense(intermediate_dim, activation="relu")(latent_inputs)
outputs = layers.Dense(original_dim, activation="sigmoid")(x)
decoder = tf.keras.Model(inputs=latent_inputs, outputs=outputs, name="decoder")

# Define VAE model.
outputs = decoder(z)
vae = tf.keras.Model(inputs=original_inputs, outputs=outputs, name="vae")

# Add KL divergence regularization loss.
kl_loss = -0.5 * tf.reduce_mean(z_log_var - tf.square(z_mean) - tf.exp(z_log_var) + 1)
vae.add_loss(kl_loss)

# Train.
optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)
vae.compile(optimizer, loss=tf.keras.losses.MeanSquaredError())
vae.fit(x_train, x_train, epochs=3, batch_size=64)
Epoch 1/3
938/938 [==============================] - 3s 3ms/step - loss: 0.0749
Epoch 2/3
938/938 [==============================] - 2s 3ms/step - loss: 0.0676
Epoch 3/3
938/938 [==============================] - 2s 3ms/step - loss: 0.0675
<keras.callbacks.History at 0x7fcd2b3a37d0>

สำหรับข้อมูลเพิ่มเติมให้แน่ใจว่าจะอ่าน คู่มือ API ฟังก์ชั่น