לשמור את התאריך! קלט / פלט של Google חוזר 18-20 במאי הירשם עכשיו
דף זה תורגם על ידי Cloud Translation API.
Switch to English

התאם אישית את מה שקורה ב- Model.fit

צפה ב- TensorFlow.org הפעל בגוגל קולאב צפה במקור ב- GitHub הורד מחברת

מבוא

כשאתה עושה למידה בפיקוח, אתה יכול להשתמש fit() והכל עובד בצורה חלקה.

כאשר אתה צריך לכתוב לולאת אימונים משלך מאפס, אתה יכול להשתמש ב- GradientTape ולהשתלט על כל פרט קטן.

אבל מה אם אתה זקוק לאלגוריתם אימונים מותאם אישית, אך אתה עדיין רוצה ליהנות מהתכונות הנוחות של fit() , כגון התקשרות חוזרות, תמיכה מובנית בהפצה או פיוזינג צעדים?

עיקרון הליבה של קרס הוא גילוי פרוגרסיבי של מורכבות . אתה תמיד אמור להיות מסוגל להיכנס לתהליכי עבודה ברמה נמוכה יותר באופן הדרגתי. אתה לא צריך ליפול מצוק אם הפונקציונליות ברמה הגבוהה לא תואמת בדיוק את השימוש שלך. אתה אמור להיות מסוגל להשיג יותר שליטה על הפרטים הקטנים תוך שמירה על כמות נוחה של נוחות ברמה גבוהה.

כאשר אתה צריך להתאים אישית מה fit() עושה, עליך לבטל את פונקציית שלב האימון של מחלקת Model . זו הפונקציה שנקראת fit() עבור כל אצווה של נתונים. לאחר מכן תוכל להתקשר fit() כרגיל - והוא יפעיל אלגוריתם למידה משלך.

שים לב כי דפוס זה אינו מונע ממך לבנות מודלים באמצעות ה- API הפונקציונלי. אתה יכול לעשות זאת בין אם אתה בונה מודלים Sequential , מודלים API פונקציונליים או מודלים מסווגים.

בואו נראה איך זה עובד.

להכין

דורש TensorFlow 2.2 ואילך.

import tensorflow as tf
from tensorflow import keras

דוגמה פשוטה ראשונה

נתחיל מדוגמה פשוטה:

  • אנו יוצרים מחלקה חדשה keras.Model .
  • אנחנו פשוט עוקפים את השיטה train_step(self, data) .
  • אנו מחזירים שמות מיפוי מילוניים (כולל הפסד) לערכם הנוכחי.

data טיעון הקלט הם מה שמועבר כדי להתאים כנתוני אימון:

  • אם אתה מעביר מערכים Numpy, על ידי קריאת fit(x, y, ...) , data יהיו הכפול (x, y)
  • אם אתה מעבירtf.data.Dataset , על ידי קריאה fit(dataset, ...) , data יהיו מה שמניב על ידי dataset בכל אצווה.

בגוף שיטת train_step אנו מיישמים עדכון אימונים קבוע, בדומה למה שכבר מכיר. חשוב לציין, אנו מחשבים את ההפסד באמצעות self.compiled_loss , העוטף את פונקציות או הפסדים שהועברו compile() .

באופן דומה, אנו קוראים ל self.compiled_metrics.update_state(y, y_pred) כדי לעדכן את מצב המדדים שהועברו self.metrics compile() , ואנו self.metrics תוצאות מ- self.metrics בסוף כדי לאחזר את הערך הנוכחי שלהם.

class CustomModel(keras.Model):
    def train_step(self, data):
        # Unpack the data. Its structure depends on your model and
        # on what you pass to `fit()`.
        x, y = data

        with tf.GradientTape() as tape:
            y_pred = self(x, training=True)  # Forward pass
            # Compute the loss value
            # (the loss function is configured in `compile()`)
            loss = self.compiled_loss(y, y_pred, regularization_losses=self.losses)

        # Compute gradients
        trainable_vars = self.trainable_variables
        gradients = tape.gradient(loss, trainable_vars)
        # Update weights
        self.optimizer.apply_gradients(zip(gradients, trainable_vars))
        # Update metrics (includes the metric that tracks the loss)
        self.compiled_metrics.update_state(y, y_pred)
        # Return a dict mapping metric names to current value
        return {m.name: m.result() for m in self.metrics}

בואו ננסה את זה:

import numpy as np

# Construct and compile an instance of CustomModel
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)
model.compile(optimizer="adam", loss="mse", metrics=["mae"])

# Just use `fit` as usual
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
model.fit(x, y, epochs=3)
Epoch 1/3
32/32 [==============================] - 1s 1ms/step - loss: 0.3689 - mae: 0.4870
Epoch 2/3
32/32 [==============================] - 0s 1ms/step - loss: 0.2860 - mae: 0.4282
Epoch 3/3
32/32 [==============================] - 0s 1ms/step - loss: 0.2690 - mae: 0.4159
<tensorflow.python.keras.callbacks.History at 0x7ff5c2a96748>

הולך בדרגה נמוכה יותר

באופן טבעי, אתה יכול פשוט לדלג על העברת פונקציית הפסד train_step compile() , ובמקום זאת לעשות הכל באופן ידני ב- train_step . כך גם למדדים.

הנה דוגמה ברמה נמוכה יותר, המשתמשת רק ב- compile() כדי להגדיר את כלי האופטימיזציה:

  • אנו מתחילים ביצירת מקרים Metric כדי לעקוב אחר הפסדנו וציון MAE.
  • אנו מיישמים train_step() מותאם אישית train_step() המעדכן את מצב המדדים הללו (על ידי קריאה update_state() עליהם) ואז update_state() אותם (דרך result() ) כדי להחזיר את הערך הממוצע הנוכחי שלהם, כדי שיוצג על ידי סרגל ההתקדמות ויהיה לעבור לכל התקשרות חוזרת.
  • שים לב שנצטרך להתקשר ל- reset_states() על המדדים שלנו בין כל תקופה! אחרת, שיחת result() תחזיר ממוצע מאז תחילת האימון, ואילו בדרך כלל אנו עובדים עם ממוצעים לתקופה. למרבה המזל, המסגרת יכולה לעשות זאת עבורנו: פשוט רשום כל מדד שתרצה לאפס במאפיין metrics של המודל. המודל יקרא reset_states() על כל אובייקט שמופיע כאן בתחילת כל תקופת fit() או בתחילת קריאה evaluate() .
loss_tracker = keras.metrics.Mean(name="loss")
mae_metric = keras.metrics.MeanAbsoluteError(name="mae")


class CustomModel(keras.Model):
    def train_step(self, data):
        x, y = data

        with tf.GradientTape() as tape:
            y_pred = self(x, training=True)  # Forward pass
            # Compute our own loss
            loss = keras.losses.mean_squared_error(y, y_pred)

        # Compute gradients
        trainable_vars = self.trainable_variables
        gradients = tape.gradient(loss, trainable_vars)

        # Update weights
        self.optimizer.apply_gradients(zip(gradients, trainable_vars))

        # Compute our own metrics
        loss_tracker.update_state(loss)
        mae_metric.update_state(y, y_pred)
        return {"loss": loss_tracker.result(), "mae": mae_metric.result()}

    @property
    def metrics(self):
        # We list our `Metric` objects here so that `reset_states()` can be
        # called automatically at the start of each epoch
        # or at the start of `evaluate()`.
        # If you don't implement this property, you have to call
        # `reset_states()` yourself at the time of your choosing.
        return [loss_tracker, mae_metric]


# Construct an instance of CustomModel
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)

# We don't passs a loss or metrics here.
model.compile(optimizer="adam")

# Just use `fit` as usual -- you can use callbacks, etc.
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
model.fit(x, y, epochs=5)
Epoch 1/5
32/32 [==============================] - 0s 1ms/step - loss: 0.7816 - mae: 0.7624
Epoch 2/5
32/32 [==============================] - 0s 1ms/step - loss: 0.3272 - mae: 0.4600
Epoch 3/5
32/32 [==============================] - 0s 1ms/step - loss: 0.2355 - mae: 0.3884
Epoch 4/5
32/32 [==============================] - 0s 1ms/step - loss: 0.2252 - mae: 0.3791
Epoch 5/5
32/32 [==============================] - 0s 1ms/step - loss: 0.2214 - mae: 0.3760
<tensorflow.python.keras.callbacks.History at 0x7ff5c29225f8>

תמיכה sample_weight class_weight sample_weight class_weight

יתכן ושמת לב שהדוגמה הבסיסית הראשונה שלנו לא הזכירה שום שקלול לדוגמא. אם ברצונך לתמוך בארגומנטים fit() sample_weight משקל sample_weight class_weight , פשוט תעשה את הפעולות הבאות:

  • sample_weight data
  • העבירו אותו ל- compiled_loss & compiled_metrics (כמובן, תוכלו גם להחיל אותו באופן ידני אם אינכם סומכים על compile() להפסדים ומדדים)
  • זהו זה. זו הרשימה.
class CustomModel(keras.Model):
    def train_step(self, data):
        # Unpack the data. Its structure depends on your model and
        # on what you pass to `fit()`.
        if len(data) == 3:
            x, y, sample_weight = data
        else:
            sample_weight = None
            x, y = data

        with tf.GradientTape() as tape:
            y_pred = self(x, training=True)  # Forward pass
            # Compute the loss value.
            # The loss function is configured in `compile()`.
            loss = self.compiled_loss(
                y,
                y_pred,
                sample_weight=sample_weight,
                regularization_losses=self.losses,
            )

        # Compute gradients
        trainable_vars = self.trainable_variables
        gradients = tape.gradient(loss, trainable_vars)

        # Update weights
        self.optimizer.apply_gradients(zip(gradients, trainable_vars))

        # Update the metrics.
        # Metrics are configured in `compile()`.
        self.compiled_metrics.update_state(y, y_pred, sample_weight=sample_weight)

        # Return a dict mapping metric names to current value.
        # Note that it will include the loss (tracked in self.metrics).
        return {m.name: m.result() for m in self.metrics}


# Construct and compile an instance of CustomModel
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)
model.compile(optimizer="adam", loss="mse", metrics=["mae"])

# You can now use sample_weight argument
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
sw = np.random.random((1000, 1))
model.fit(x, y, sample_weight=sw, epochs=3)
Epoch 1/3
32/32 [==============================] - 0s 1ms/step - loss: 0.3251 - mae: 0.7057
Epoch 2/3
32/32 [==============================] - 0s 1ms/step - loss: 0.1303 - mae: 0.4293
Epoch 3/3
32/32 [==============================] - 0s 1ms/step - loss: 0.0950 - mae: 0.3699
<tensorflow.python.keras.callbacks.History at 0x7ff5c28ae668>

מתן שלב הערכה משלך

מה אם אתה רוצה לעשות את אותו הדבר עבור שיחות ל- model.evaluate() ? אז היית עוקף את test_step בדיוק באותו אופן. כך זה נראה:

class CustomModel(keras.Model):
    def test_step(self, data):
        # Unpack the data
        x, y = data
        # Compute predictions
        y_pred = self(x, training=False)
        # Updates the metrics tracking the loss
        self.compiled_loss(y, y_pred, regularization_losses=self.losses)
        # Update the metrics.
        self.compiled_metrics.update_state(y, y_pred)
        # Return a dict mapping metric names to current value.
        # Note that it will include the loss (tracked in self.metrics).
        return {m.name: m.result() for m in self.metrics}


# Construct an instance of CustomModel
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)
model.compile(loss="mse", metrics=["mae"])

# Evaluate with our custom test_step
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
model.evaluate(x, y)
32/32 [==============================] - 0s 1ms/step - loss: 5.6995 - mae: 2.3246
[5.736238479614258, 2.331244945526123]

גלישה: דוגמה ל- GAN מקצה לקצה

בואו נעבור על דוגמה מקצה לקצה הממנפת את כל מה שלמדתם זה עתה.

בואו ניקח בחשבון:

  • רשת גנרטורים שנועדה ליצור תמונות 28x28x1.
  • רשת מפלה נועדה לסווג תמונות 28x28x1 לשתי מחלקות ("מזויפות" ו"אמיתיות ").
  • מיטוב אחד לכל אחד.
  • פונקציית אובדן להכשרת המפלה.
from tensorflow.keras import layers

# Create the discriminator
discriminator = keras.Sequential(
    [
        keras.Input(shape=(28, 28, 1)),
        layers.Conv2D(64, (3, 3), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.Conv2D(128, (3, 3), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.GlobalMaxPooling2D(),
        layers.Dense(1),
    ],
    name="discriminator",
)

# Create the generator
latent_dim = 128
generator = keras.Sequential(
    [
        keras.Input(shape=(latent_dim,)),
        # We want to generate 128 coefficients to reshape into a 7x7x128 map
        layers.Dense(7 * 7 * 128),
        layers.LeakyReLU(alpha=0.2),
        layers.Reshape((7, 7, 128)),
        layers.Conv2DTranspose(128, (4, 4), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.Conv2DTranspose(128, (4, 4), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.Conv2D(1, (7, 7), padding="same", activation="sigmoid"),
    ],
    name="generator",
)

הנה מחלקה GAN שלמה עם תכונה, עוקפת compile() לשימוש בחתימה משלה, ומיישמת את אלגוריתם ה- GAN כולו ב- 17 שורות ב- train_step :

class GAN(keras.Model):
    def __init__(self, discriminator, generator, latent_dim):
        super(GAN, self).__init__()
        self.discriminator = discriminator
        self.generator = generator
        self.latent_dim = latent_dim

    def compile(self, d_optimizer, g_optimizer, loss_fn):
        super(GAN, self).compile()
        self.d_optimizer = d_optimizer
        self.g_optimizer = g_optimizer
        self.loss_fn = loss_fn

    def train_step(self, real_images):
        if isinstance(real_images, tuple):
            real_images = real_images[0]
        # Sample random points in the latent space
        batch_size = tf.shape(real_images)[0]
        random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))

        # Decode them to fake images
        generated_images = self.generator(random_latent_vectors)

        # Combine them with real images
        combined_images = tf.concat([generated_images, real_images], axis=0)

        # Assemble labels discriminating real from fake images
        labels = tf.concat(
            [tf.ones((batch_size, 1)), tf.zeros((batch_size, 1))], axis=0
        )
        # Add random noise to the labels - important trick!
        labels += 0.05 * tf.random.uniform(tf.shape(labels))

        # Train the discriminator
        with tf.GradientTape() as tape:
            predictions = self.discriminator(combined_images)
            d_loss = self.loss_fn(labels, predictions)
        grads = tape.gradient(d_loss, self.discriminator.trainable_weights)
        self.d_optimizer.apply_gradients(
            zip(grads, self.discriminator.trainable_weights)
        )

        # Sample random points in the latent space
        random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))

        # Assemble labels that say "all real images"
        misleading_labels = tf.zeros((batch_size, 1))

        # Train the generator (note that we should *not* update the weights
        # of the discriminator)!
        with tf.GradientTape() as tape:
            predictions = self.discriminator(self.generator(random_latent_vectors))
            g_loss = self.loss_fn(misleading_labels, predictions)
        grads = tape.gradient(g_loss, self.generator.trainable_weights)
        self.g_optimizer.apply_gradients(zip(grads, self.generator.trainable_weights))
        return {"d_loss": d_loss, "g_loss": g_loss}

בואו נסה אותו:

# Prepare the dataset. We use both the training & test MNIST digits.
batch_size = 64
(x_train, _), (x_test, _) = keras.datasets.mnist.load_data()
all_digits = np.concatenate([x_train, x_test])
all_digits = all_digits.astype("float32") / 255.0
all_digits = np.reshape(all_digits, (-1, 28, 28, 1))
dataset = tf.data.Dataset.from_tensor_slices(all_digits)
dataset = dataset.shuffle(buffer_size=1024).batch(batch_size)

gan = GAN(discriminator=discriminator, generator=generator, latent_dim=latent_dim)
gan.compile(
    d_optimizer=keras.optimizers.Adam(learning_rate=0.0003),
    g_optimizer=keras.optimizers.Adam(learning_rate=0.0003),
    loss_fn=keras.losses.BinaryCrossentropy(from_logits=True),
)

# To limit the execution time, we only train on 100 batches. You can train on
# the entire dataset. You will need about 20 epochs to get nice results.
gan.fit(dataset.take(100), epochs=1)
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
11493376/11490434 [==============================] - 0s 0us/step
100/100 [==============================] - 12s 11ms/step - d_loss: 0.4581 - g_loss: 0.8789
<tensorflow.python.keras.callbacks.History at 0x7ff5712ea978>

הרעיונות העומדים מאחורי למידה עמוקה הם פשוטים, אז מדוע יישומם צריך להיות כואב?