יצירת שכבות ודגמים חדשים באמצעות תת-סיווג

קל לארגן דפים בעזרת אוספים אפשר לשמור ולסווג תוכן על סמך ההעדפות שלך.

הצג באתר TensorFlow.org הפעל בגוגל קולאב צפה במקור ב-GitHub הורד מחברת

להכין

import tensorflow as tf
from tensorflow import keras

Layer בכיתה: שילוב של מדינה (משקולות) וכמה חישוב

אחד ההפשטה המרכזית Keras היא Layer בכיתה. שכבה מקפלת הן מצב (ה"משקלים" של השכבה) והן טרנספורמציה מכניסות ליציאות ("קריאה", מעבר קדימה של השכבה).

הנה שכבה מחוברת בצפיפות. יש לו מדינה: המשתנים w ו- b .

class Linear(keras.layers.Layer):
    def __init__(self, units=32, input_dim=32):
        super(Linear, self).__init__()
        w_init = tf.random_normal_initializer()
        self.w = tf.Variable(
            initial_value=w_init(shape=(input_dim, units), dtype="float32"),
            trainable=True,
        )
        b_init = tf.zeros_initializer()
        self.b = tf.Variable(
            initial_value=b_init(shape=(units,), dtype="float32"), trainable=True
        )

    def call(self, inputs):
        return tf.matmul(inputs, self.w) + self.b

אתה תשתמש בשכבה על ידי קריאה לה בקלט/ים מסוימים של טנסור, בדומה לפונקציית Python.

x = tf.ones((2, 2))
linear_layer = Linear(4, 2)
y = linear_layer(x)
print(y)
tf.Tensor(
[[ 0.00962844 -0.01307489 -0.1452128   0.0538918 ]
 [ 0.00962844 -0.01307489 -0.1452128   0.0538918 ]], shape=(2, 4), dtype=float32)

הערה כי משקולות w ו- b מתבצעים מעקב אוטומטי אחרות על ידי השכבה על גבי בהיותן מוצב תכונות שכבה:

assert linear_layer.weights == [linear_layer.w, linear_layer.b]

הערה יש לך גם גישה קיצור מהיר להוספת משקל שכבה: את add_weight() שיטה:

class Linear(keras.layers.Layer):
    def __init__(self, units=32, input_dim=32):
        super(Linear, self).__init__()
        self.w = self.add_weight(
            shape=(input_dim, units), initializer="random_normal", trainable=True
        )
        self.b = self.add_weight(shape=(units,), initializer="zeros", trainable=True)

    def call(self, inputs):
        return tf.matmul(inputs, self.w) + self.b


x = tf.ones((2, 2))
linear_layer = Linear(4, 2)
y = linear_layer(x)
print(y)
tf.Tensor(
[[ 0.05790994  0.060931   -0.0402256  -0.09450993]
 [ 0.05790994  0.060931   -0.0402256  -0.09450993]], shape=(2, 4), dtype=float32)

לשכבות יכולות להיות משקולות שאינן ניתנות לאימון

מלבד משקולות ניתנות לאימון, ניתן להוסיף לשכבה גם משקולות שאינן ניתנות לאימון. משקולות כאלה אמורות לא להילקח בחשבון במהלך התפשטות לאחור, כאשר אתה מאמן את השכבה.

הנה איך להוסיף ולהשתמש במשקל שאינו ניתן לאימון:

class ComputeSum(keras.layers.Layer):
    def __init__(self, input_dim):
        super(ComputeSum, self).__init__()
        self.total = tf.Variable(initial_value=tf.zeros((input_dim,)), trainable=False)

    def call(self, inputs):
        self.total.assign_add(tf.reduce_sum(inputs, axis=0))
        return self.total


x = tf.ones((2, 2))
my_sum = ComputeSum(2)
y = my_sum(x)
print(y.numpy())
y = my_sum(x)
print(y.numpy())
[2. 2.]
[4. 4.]

זה חלק layer.weights , אבל זה שיסווג משקל שאינו שאפשר לאלף:

print("weights:", len(my_sum.weights))
print("non-trainable weights:", len(my_sum.non_trainable_weights))

# It's not included in the trainable weights:
print("trainable_weights:", my_sum.trainable_weights)
weights: 1
non-trainable weights: 1
trainable_weights: []

שיטות עבודה מומלצות: דחיית יצירת משקל עד שצורת התשומות תהיה ידועה

שלנו Linear שכבה מעל לקחו input_dim הטיעון ששימש לחישוב צורת משקולות w ו- b ב __init__() :

class Linear(keras.layers.Layer):
    def __init__(self, units=32, input_dim=32):
        super(Linear, self).__init__()
        self.w = self.add_weight(
            shape=(input_dim, units), initializer="random_normal", trainable=True
        )
        self.b = self.add_weight(shape=(units,), initializer="zeros", trainable=True)

    def call(self, inputs):
        return tf.matmul(inputs, self.w) + self.b

במקרים רבים, ייתכן שאינך יודע מראש את גודל התשומות שלך, ותרצה ליצור משקלים בעצלתיים כאשר הערך הזה ייוודע, זמן מה לאחר הפעלת השכבה.

בשנות ה API Keras, אנו ממליצים ליצור משקולות שכבה של build(self, inputs_shape) שיטה של השכבה שלך. ככה:

class Linear(keras.layers.Layer):
    def __init__(self, units=32):
        super(Linear, self).__init__()
        self.units = units

    def build(self, input_shape):
        self.w = self.add_weight(
            shape=(input_shape[-1], self.units),
            initializer="random_normal",
            trainable=True,
        )
        self.b = self.add_weight(
            shape=(self.units,), initializer="random_normal", trainable=True
        )

    def call(self, inputs):
        return tf.matmul(inputs, self.w) + self.b

__call__() שיטה של השכבה שלך תתחדש אוטומטית לבנות בפעם הראשונה זה נקרא. כעת יש לך שכבה עצלנית ולכן קלה יותר לשימוש:

# At instantiation, we don't know on what inputs this is going to get called
linear_layer = Linear(32)

# The layer's weights are created dynamically the first time the layer is called
y = linear_layer(x)

הטמעת build() בנפרד כפי שפורטה לעיל יפה מפריד יצירת משקולות רק פעם מלהשתמש משקולות בכל שיחה. עם זאת, עבור כמה שכבות מותאמות אישית מתקדמות, זה יכול להיות בלתי מעשי להפריד בין יצירת המצב לבין החישוב. מיישמי שכבה רשאים לדחות יצירת המשקל הראשון __call__() , אבל צורך לדאוג כי שיחות מאוחר להשתמש באותו משקולות. בנוסף, מאז __call__() צפוי להיות מוצא להורג בפעם הראשונה בתוך tf.function , כל יצירה משתנה המתרחשת __call__() צריך להיות עטוף tf.init_scope .

שכבות ניתנות לחיבור רקורסיבי

אם תקצה מופע שכבה כתכונה של שכבה אחרת, השכבה החיצונית תתחיל לעקוב אחר המשקלים שנוצרו על ידי השכבה הפנימית.

אנו ממליצים ליצור sublayers כאלה __init__() שיטה ולהשאיר אותו הראשון __call__() כדי להפעיל בניית משקולות שלהם.

class MLPBlock(keras.layers.Layer):
    def __init__(self):
        super(MLPBlock, self).__init__()
        self.linear_1 = Linear(32)
        self.linear_2 = Linear(32)
        self.linear_3 = Linear(1)

    def call(self, inputs):
        x = self.linear_1(inputs)
        x = tf.nn.relu(x)
        x = self.linear_2(x)
        x = tf.nn.relu(x)
        return self.linear_3(x)


mlp = MLPBlock()
y = mlp(tf.ones(shape=(3, 64)))  # The first call to the `mlp` will create the weights
print("weights:", len(mlp.weights))
print("trainable weights:", len(mlp.trainable_weights))
weights: 6
trainable weights: 6

add_loss() שיטה

כאשר כותבים את call() שיטה של שכבה, אתה יכול ליצור הפסד tensors כי אתה רוצה להשתמש מאוחר יותר, בעת כתיבת לולאה האימונים שלך. זה הוא בר ביצוע על ידי התקשרות self.add_loss(value) :

# A layer that creates an activity regularization loss
class ActivityRegularizationLayer(keras.layers.Layer):
    def __init__(self, rate=1e-2):
        super(ActivityRegularizationLayer, self).__init__()
        self.rate = rate

    def call(self, inputs):
        self.add_loss(self.rate * tf.reduce_sum(inputs))
        return inputs

הפסדים אלה (כולל אלה שנוצרו על ידי כל השכבה הפנימית) ניתן לאחזר באמצעות layer.losses . מאפיין זה מתאפס בתחילת כל __call__() לשכבה העליונה, אך כך layer.losses תמיד מכילה את הערכים ההפסד שנוצרו במהלך המעבר האחרון קדימה.

class OuterLayer(keras.layers.Layer):
    def __init__(self):
        super(OuterLayer, self).__init__()
        self.activity_reg = ActivityRegularizationLayer(1e-2)

    def call(self, inputs):
        return self.activity_reg(inputs)


layer = OuterLayer()
assert len(layer.losses) == 0  # No losses yet since the layer has never been called

_ = layer(tf.zeros(1, 1))
assert len(layer.losses) == 1  # We created one loss value

# `layer.losses` gets reset at the start of each __call__
_ = layer(tf.zeros(1, 1))
assert len(layer.losses) == 1  # This is the loss created during the call above

בנוסף, loss הרכוש מכיל גם הפסדים להסדרת יצר עבור המשקולות של כול שכבה פנימית:

class OuterLayerWithKernelRegularizer(keras.layers.Layer):
    def __init__(self):
        super(OuterLayerWithKernelRegularizer, self).__init__()
        self.dense = keras.layers.Dense(
            32, kernel_regularizer=tf.keras.regularizers.l2(1e-3)
        )

    def call(self, inputs):
        return self.dense(inputs)


layer = OuterLayerWithKernelRegularizer()
_ = layer(tf.zeros((1, 1)))

# This is `1e-3 * sum(layer.dense.kernel ** 2)`,
# created by the `kernel_regularizer` above.
print(layer.losses)
[<tf.Tensor: shape=(), dtype=float32, numpy=0.0024520475>]

הפסדים אלו אמורים להילקח בחשבון בעת ​​כתיבת לולאות אימון, כמו זה:

# Instantiate an optimizer.
optimizer = tf.keras.optimizers.SGD(learning_rate=1e-3)
loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)

# Iterate over the batches of a dataset.
for x_batch_train, y_batch_train in train_dataset:
  with tf.GradientTape() as tape:
    logits = layer(x_batch_train)  # Logits for this minibatch
    # Loss value for this minibatch
    loss_value = loss_fn(y_batch_train, logits)
    # Add extra losses created during this forward pass:
    loss_value += sum(model.losses)

  grads = tape.gradient(loss_value, model.trainable_weights)
  optimizer.apply_gradients(zip(grads, model.trainable_weights))

לקבלת מדריך מפורט על כתיבת לולאות אימונים, ראה מדריך לכתיבת לולאת הכשרה מאפסת .

הפסדים אלה גם לעבוד בצורה חלקה עם fit() (הם מקבלים סכמו באופן אוטומטי ויתווספו ההפסד העיקרי, אם בכלל):

import numpy as np

inputs = keras.Input(shape=(3,))
outputs = ActivityRegularizationLayer()(inputs)
model = keras.Model(inputs, outputs)

# If there is a loss passed in `compile`, the regularization
# losses get added to it
model.compile(optimizer="adam", loss="mse")
model.fit(np.random.random((2, 3)), np.random.random((2, 3)))

# It's also possible not to pass any loss in `compile`,
# since the model already has a loss to minimize, via the `add_loss`
# call during the forward pass!
model.compile(optimizer="adam")
model.fit(np.random.random((2, 3)), np.random.random((2, 3)))
1/1 [==============================] - 0s 209ms/step - loss: 0.1948
1/1 [==============================] - 0s 49ms/step - loss: 0.0298
<keras.callbacks.History at 0x7fce9052d290>

add_metric() שיטה

בדומה add_loss() , שכבות גם יש add_metric() שיטה למעקב אחר הממוצע הנע של כמות במהלך אימונים.

קחו בחשבון את השכבה הבאה: שכבת "נקודת קצה לוגיסטית". זה לוקח כנבואות & מטרות תשומות, והוא מחשב הפסד בהם היא עוקבת באמצעות add_loss() , והוא מחשב סקלר דיוק, שהיא עוקבת באמצעות add_metric() .

class LogisticEndpoint(keras.layers.Layer):
    def __init__(self, name=None):
        super(LogisticEndpoint, self).__init__(name=name)
        self.loss_fn = keras.losses.BinaryCrossentropy(from_logits=True)
        self.accuracy_fn = keras.metrics.BinaryAccuracy()

    def call(self, targets, logits, sample_weights=None):
        # Compute the training-time loss value and add it
        # to the layer using `self.add_loss()`.
        loss = self.loss_fn(targets, logits, sample_weights)
        self.add_loss(loss)

        # Log accuracy as a metric and add it
        # to the layer using `self.add_metric()`.
        acc = self.accuracy_fn(targets, logits, sample_weights)
        self.add_metric(acc, name="accuracy")

        # Return the inference-time prediction tensor (for `.predict()`).
        return tf.nn.softmax(logits)

מדדי מעקב בדרך זו הם נגישים דרך layer.metrics :

layer = LogisticEndpoint()

targets = tf.ones((2, 2))
logits = tf.ones((2, 2))
y = layer(targets, logits)

print("layer.metrics:", layer.metrics)
print("current accuracy value:", float(layer.metrics[0].result()))
layer.metrics: [<keras.metrics.BinaryAccuracy object at 0x7fce90578490>]
current accuracy value: 1.0

בדיוק כמו עבור add_loss() , מדדים אלה נמצאים במעקב fit() :

inputs = keras.Input(shape=(3,), name="inputs")
targets = keras.Input(shape=(10,), name="targets")
logits = keras.layers.Dense(10)(inputs)
predictions = LogisticEndpoint(name="predictions")(logits, targets)

model = keras.Model(inputs=[inputs, targets], outputs=predictions)
model.compile(optimizer="adam")

data = {
    "inputs": np.random.random((3, 3)),
    "targets": np.random.random((3, 10)),
}
model.fit(data)
1/1 [==============================] - 0s 274ms/step - loss: 0.9291 - binary_accuracy: 0.0000e+00
<keras.callbacks.History at 0x7fce90448c50>

אתה יכול לאפשר אופציונלי סדרה בשכבות שלך

אם אתם זקוקים שכבות מותאמות אישית שלך להיות serializable כחלק מודל פונקציונאלי , אתה יכול לחלופין ליישם get_config() שיטה:

class Linear(keras.layers.Layer):
    def __init__(self, units=32):
        super(Linear, self).__init__()
        self.units = units

    def build(self, input_shape):
        self.w = self.add_weight(
            shape=(input_shape[-1], self.units),
            initializer="random_normal",
            trainable=True,
        )
        self.b = self.add_weight(
            shape=(self.units,), initializer="random_normal", trainable=True
        )

    def call(self, inputs):
        return tf.matmul(inputs, self.w) + self.b

    def get_config(self):
        return {"units": self.units}


# Now you can recreate the layer from its config:
layer = Linear(64)
config = layer.get_config()
print(config)
new_layer = Linear.from_config(config)
{'units': 64}

הערה כי __init__() שיטה של הבסיס Layer בכיתה לוקח כמה טיעונים למילות מפתח, בפרט name וגם dtype . זהו תרגול טוב כדי לעבור ויכוחים אלה מעמד ההורה ב __init__() ולכלול אותם config השכבה:

class Linear(keras.layers.Layer):
    def __init__(self, units=32, **kwargs):
        super(Linear, self).__init__(**kwargs)
        self.units = units

    def build(self, input_shape):
        self.w = self.add_weight(
            shape=(input_shape[-1], self.units),
            initializer="random_normal",
            trainable=True,
        )
        self.b = self.add_weight(
            shape=(self.units,), initializer="random_normal", trainable=True
        )

    def call(self, inputs):
        return tf.matmul(inputs, self.w) + self.b

    def get_config(self):
        config = super(Linear, self).get_config()
        config.update({"units": self.units})
        return config


layer = Linear(64)
config = layer.get_config()
print(config)
new_layer = Linear.from_config(config)
{'name': 'linear_8', 'trainable': True, 'dtype': 'float32', 'units': 64}

אם אתה צריך יותר גמישות כאשר deserializing השכבה מ config שלו, אתה יכול גם לעקוף את from_config() שיטה בכיתה. זהו יישום הבסיס של from_config() :

def from_config(cls, config):
  return cls(**config)

כדי ללמוד עוד על בהמשכים ושמירה, לראות את השלם מדריך להצלת ו בהמשכים דגמים .

Privileged training הטיעון של call() שיטה

שכבות מסוימות, ובעיקר BatchNormalization השכבה ואת Dropout השכבה, יש התנהגויות שונות במהלך אימון והסקה. עבור שכבות כאלה, זה מקובל לחשוף training טיעון (בוליאני) ב call() השיטה.

על ידי חשיפת טיעון זה call() , תפעיל מובנה לולאות הכשרה והערכה (למשל fit() ) כדי להשתמש בצורה נכונה את השכבה באימוני היקש.

class CustomDropout(keras.layers.Layer):
    def __init__(self, rate, **kwargs):
        super(CustomDropout, self).__init__(**kwargs)
        self.rate = rate

    def call(self, inputs, training=None):
        if training:
            return tf.nn.dropout(inputs, rate=self.rate)
        return inputs

Privileged mask טיעון של call() שיטה

הטענה החסויה האחרת הנתמכות על ידי call() היא mask הטיעון.

תוכלו למצוא אותו בכל שכבות Keras RNN. מסיכה היא טנזור בוליאני (ערך בוליאני אחד לכל צעד זמן בקלט) המשמש כדי לדלג על שלבי זמן מסוימים של קלט בעת עיבוד נתוני סדרות זמן.

Keras יועבר נכון mask הטיעון כדי __call__() עבור שכבות שתומכות בה, כאשר מסכה נוצרת על ידי שכבה מוקדמת. מסכה מניבת שכבות הם Embedding השכבה מוגדרת עם mask_zero=True , ואת Masking השכבה.

כדי ללמוד עוד על מיסוך ואיך לכתוב מיסוך מאופשר שכבות, אנא בדוק את המדריך "הבנת ריפוד מיסוך" .

Model בכיתה

באופן כללי, תוכל להשתמש Layer בכיתה להגדיר בלוקי חישוב פנימיים, ישתמש Model בכיתה להגדיר את המודל החיצוני - האובייקט תוכל להתאמן.

למשל, ב מודל ResNet50, היית צריך כמה בלוקים ResNet subclassing Layer , וכן יחיד Model המקיף ברשת ResNet50 כולו.

Model הכיתה יש את אותה API כמו Layer , עם ההבדלים הבאים:

  • היא חושפת מובנה אימונים, הערכה, ולולאות חיזוי ( model.fit() , model.evaluate() , model.predict() ).
  • הוא חושף את רשימת השכבות שלה הפנימיות, דרך model.layers הרכוש.
  • זה חושף חיסכון ו- APIs בהמשכים ( save() , save_weights() ...)

הלכה למעשה, Layer תואמת בכיתה למה שאנחנו מתייחסים בספרות כאל "השכבה" (כמו "שכבת פיתול" או "שכבה חוזרת") או בתור "בלוק" (כמו "ResNet הבלוק" או "Inception הבלוק" ).

בינתיים, Model תואם הכיתה מה שמכונה בספרות בתור "מודל" (כמו "מודל למידה עמוק") או בתור "רשת" (כמו "רשת עצבית עמוקה").

אז אם אתם תוהים, "אני צריך להשתמש Layer בכיתה או Model בכיתה?", שאל את עצמך: האם אני צריך להתקשר fit() על זה? האם אני צריך להתקשר save() על זה? אם כן, ללכת עם Model . אם לא (אם משום המעמד שלך הוא רק בלוק במערכת גדולה, או בגלל שאתה כותב אימונים & חיסכון הקוד בעצמך), שימוש Layer .

למשל, היינו יכולים לקחת דוגמה מיני-resnet שלנו לעיל, ולהשתמש בו כדי לבנות Model שנוכל להתאמן עם fit() , וכי נוכל לחסוך עם save_weights() :

class ResNet(tf.keras.Model):

    def __init__(self, num_classes=1000):
        super(ResNet, self).__init__()
        self.block_1 = ResNetBlock()
        self.block_2 = ResNetBlock()
        self.global_pool = layers.GlobalAveragePooling2D()
        self.classifier = Dense(num_classes)

    def call(self, inputs):
        x = self.block_1(inputs)
        x = self.block_2(x)
        x = self.global_pool(x)
        return self.classifier(x)


resnet = ResNet()
dataset = ...
resnet.fit(dataset, epochs=10)
resnet.save(filepath)

מחברים את הכל ביחד: דוגמה מקצה לקצה

הנה מה שלמדת עד כה:

  • Layer לתמצת מדינה (שנוצר __init__() או build() ) וחלקם החישוב (כהגדרתם call() ).
  • ניתן לקנן שכבות רקורסיבית כדי ליצור בלוקי חישוב חדשים וגדולים יותר.
  • שכבות יכול ליצור הפסדים המסלול (הפסדים ההסדרה בדרך כלל), וכן ערכים, באמצעות add_loss() ו add_metric()
  • מיכל החיצוני, הדבר שאתה רוצה לאמן, הוא Model . Model הוא בדיוק כמו Layer , אבל עם שירות הדרכה בהמשכים הוסיף.

בואו נחבר את כל הדברים האלה יחד לדוגמא מקצה לקצה: אנחנו הולכים ליישם קודן אוטומטי וריאציוני (VAE). נאמן אותו על ספרות MNIST.

שלנו VAE יהיה תת מחלקה של Model , בנוי כמו קומפוזיציה מקוננות של שכבות כי תת Layer . זה יכלול אובדן רגוליזציה (סטיית KL).

from tensorflow.keras import layers


class Sampling(layers.Layer):
    """Uses (z_mean, z_log_var) to sample z, the vector encoding a digit."""

    def call(self, inputs):
        z_mean, z_log_var = inputs
        batch = tf.shape(z_mean)[0]
        dim = tf.shape(z_mean)[1]
        epsilon = tf.keras.backend.random_normal(shape=(batch, dim))
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon


class Encoder(layers.Layer):
    """Maps MNIST digits to a triplet (z_mean, z_log_var, z)."""

    def __init__(self, latent_dim=32, intermediate_dim=64, name="encoder", **kwargs):
        super(Encoder, self).__init__(name=name, **kwargs)
        self.dense_proj = layers.Dense(intermediate_dim, activation="relu")
        self.dense_mean = layers.Dense(latent_dim)
        self.dense_log_var = layers.Dense(latent_dim)
        self.sampling = Sampling()

    def call(self, inputs):
        x = self.dense_proj(inputs)
        z_mean = self.dense_mean(x)
        z_log_var = self.dense_log_var(x)
        z = self.sampling((z_mean, z_log_var))
        return z_mean, z_log_var, z


class Decoder(layers.Layer):
    """Converts z, the encoded digit vector, back into a readable digit."""

    def __init__(self, original_dim, intermediate_dim=64, name="decoder", **kwargs):
        super(Decoder, self).__init__(name=name, **kwargs)
        self.dense_proj = layers.Dense(intermediate_dim, activation="relu")
        self.dense_output = layers.Dense(original_dim, activation="sigmoid")

    def call(self, inputs):
        x = self.dense_proj(inputs)
        return self.dense_output(x)


class VariationalAutoEncoder(keras.Model):
    """Combines the encoder and decoder into an end-to-end model for training."""

    def __init__(
        self,
        original_dim,
        intermediate_dim=64,
        latent_dim=32,
        name="autoencoder",
        **kwargs
    ):
        super(VariationalAutoEncoder, self).__init__(name=name, **kwargs)
        self.original_dim = original_dim
        self.encoder = Encoder(latent_dim=latent_dim, intermediate_dim=intermediate_dim)
        self.decoder = Decoder(original_dim, intermediate_dim=intermediate_dim)

    def call(self, inputs):
        z_mean, z_log_var, z = self.encoder(inputs)
        reconstructed = self.decoder(z)
        # Add KL divergence regularization loss.
        kl_loss = -0.5 * tf.reduce_mean(
            z_log_var - tf.square(z_mean) - tf.exp(z_log_var) + 1
        )
        self.add_loss(kl_loss)
        return reconstructed

בואו נכתוב לולאת אימון פשוטה ב-MNIST:

original_dim = 784
vae = VariationalAutoEncoder(original_dim, 64, 32)

optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)
mse_loss_fn = tf.keras.losses.MeanSquaredError()

loss_metric = tf.keras.metrics.Mean()

(x_train, _), _ = tf.keras.datasets.mnist.load_data()
x_train = x_train.reshape(60000, 784).astype("float32") / 255

train_dataset = tf.data.Dataset.from_tensor_slices(x_train)
train_dataset = train_dataset.shuffle(buffer_size=1024).batch(64)

epochs = 2

# Iterate over epochs.
for epoch in range(epochs):
    print("Start of epoch %d" % (epoch,))

    # Iterate over the batches of the dataset.
    for step, x_batch_train in enumerate(train_dataset):
        with tf.GradientTape() as tape:
            reconstructed = vae(x_batch_train)
            # Compute reconstruction loss
            loss = mse_loss_fn(x_batch_train, reconstructed)
            loss += sum(vae.losses)  # Add KLD regularization loss

        grads = tape.gradient(loss, vae.trainable_weights)
        optimizer.apply_gradients(zip(grads, vae.trainable_weights))

        loss_metric(loss)

        if step % 100 == 0:
            print("step %d: mean loss = %.4f" % (step, loss_metric.result()))
Start of epoch 0
step 0: mean loss = 0.3184
step 100: mean loss = 0.1252
step 200: mean loss = 0.0989
step 300: mean loss = 0.0890
step 400: mean loss = 0.0841
step 500: mean loss = 0.0807
step 600: mean loss = 0.0787
step 700: mean loss = 0.0771
step 800: mean loss = 0.0759
step 900: mean loss = 0.0749
Start of epoch 1
step 0: mean loss = 0.0746
step 100: mean loss = 0.0740
step 200: mean loss = 0.0735
step 300: mean loss = 0.0730
step 400: mean loss = 0.0727
step 500: mean loss = 0.0723
step 600: mean loss = 0.0720
step 700: mean loss = 0.0717
step 800: mean loss = 0.0715
step 900: mean loss = 0.0712

שים לב מאז VAE הוא subclassing Model , הוא כולל מובנית לולאות אימונים. אז אתה יכול גם לאמן את זה ככה:

vae = VariationalAutoEncoder(784, 64, 32)

optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)

vae.compile(optimizer, loss=tf.keras.losses.MeanSquaredError())
vae.fit(x_train, x_train, epochs=2, batch_size=64)
Epoch 1/2
938/938 [==============================] - 3s 3ms/step - loss: 0.0745
Epoch 2/2
938/938 [==============================] - 3s 3ms/step - loss: 0.0676
<keras.callbacks.History at 0x7fce90282750>

מעבר לפיתוח מונחה עצמים: ה-API הפונקציונלי

האם הדוגמה הזו הייתה יותר מדי פיתוח מונחה עצמים עבורך? ניתן גם לבנות מודלים באמצעות API פונקציונלית . חשוב לציין, בחירה בסגנון כזה או אחר לא מונעת מכם למנף רכיבים שנכתבו בסגנון האחר: תמיד תוכלו לערבב.

למשל, את דוגמא API פונקציונלי מתחת שימוש חוזר באותו Sampling השכבה הגדירו בדוגמא לעיל:

original_dim = 784
intermediate_dim = 64
latent_dim = 32

# Define encoder model.
original_inputs = tf.keras.Input(shape=(original_dim,), name="encoder_input")
x = layers.Dense(intermediate_dim, activation="relu")(original_inputs)
z_mean = layers.Dense(latent_dim, name="z_mean")(x)
z_log_var = layers.Dense(latent_dim, name="z_log_var")(x)
z = Sampling()((z_mean, z_log_var))
encoder = tf.keras.Model(inputs=original_inputs, outputs=z, name="encoder")

# Define decoder model.
latent_inputs = tf.keras.Input(shape=(latent_dim,), name="z_sampling")
x = layers.Dense(intermediate_dim, activation="relu")(latent_inputs)
outputs = layers.Dense(original_dim, activation="sigmoid")(x)
decoder = tf.keras.Model(inputs=latent_inputs, outputs=outputs, name="decoder")

# Define VAE model.
outputs = decoder(z)
vae = tf.keras.Model(inputs=original_inputs, outputs=outputs, name="vae")

# Add KL divergence regularization loss.
kl_loss = -0.5 * tf.reduce_mean(z_log_var - tf.square(z_mean) - tf.exp(z_log_var) + 1)
vae.add_loss(kl_loss)

# Train.
optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)
vae.compile(optimizer, loss=tf.keras.losses.MeanSquaredError())
vae.fit(x_train, x_train, epochs=3, batch_size=64)
Epoch 1/3
938/938 [==============================] - 3s 3ms/step - loss: 0.0748
Epoch 2/3
938/938 [==============================] - 3s 3ms/step - loss: 0.0676
Epoch 3/3
938/938 [==============================] - 3s 3ms/step - loss: 0.0676
<keras.callbacks.History at 0x7fce90233cd0>

לקבלת מידע נוסף, הקפד לקרוא את המדריך API פונקציונלית .