Ayuda a proteger la Gran Barrera de Coral con TensorFlow en Kaggle Únete Challenge

Introducción a los codificadores automáticos

Ver en TensorFlow.org Ejecutar en Google Colab Ver fuente en GitHub Descargar cuaderno

Este tutorial presenta los codificadores automáticos con tres ejemplos: conceptos básicos, eliminación de ruido de imágenes y detección de anomalías.

Un autoencoder es un tipo especial de red neuronal que está entrenada para copiar su entrada a su salida. Por ejemplo, dada una imagen de un dígito escrito a mano, un codificador automático primero codifica la imagen en una representación latente de menor dimensión, luego decodifica la representación latente de nuevo a una imagen. Un codificador automático aprende a comprimir los datos mientras minimiza el error de reconstrucción.

Para obtener más información sobre autoencoders, por favor considere leer el capítulo 14 de profundo aprendizaje por Ian Goodfellow, Yoshua Bengio, y Aaron Courville.

Importar TensorFlow y otras bibliotecas

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import tensorflow as tf

from sklearn.metrics import accuracy_score, precision_score, recall_score
from sklearn.model_selection import train_test_split
from tensorflow.keras import layers, losses
from tensorflow.keras.datasets import fashion_mnist
from tensorflow.keras.models import Model

Cargar el conjunto de datos

Para comenzar, entrenará el codificador automático básico utilizando el conjunto de datos Fashon MNIST. Cada imagen de este conjunto de datos tiene 28 x 28 píxeles.

(x_train, _), (x_test, _) = fashion_mnist.load_data()

x_train = x_train.astype('float32') / 255.
x_test = x_test.astype('float32') / 255.

print (x_train.shape)
print (x_test.shape)
(60000, 28, 28)
(10000, 28, 28)

Primer ejemplo: codificador automático básico

Resultados básicos del codificador automático

Definir una autoencoder con dos capas densas: un encoder , que comprime las imágenes en un vector latente dimensional 64, y un decoder , que reconstruye la imagen original desde el espacio latente.

Para definir su modelo, utilice la API Keras Modelo subclases .

latent_dim = 64 

class Autoencoder(Model):
  def __init__(self, latent_dim):
    super(Autoencoder, self).__init__()
    self.latent_dim = latent_dim   
    self.encoder = tf.keras.Sequential([
      layers.Flatten(),
      layers.Dense(latent_dim, activation='relu'),
    ])
    self.decoder = tf.keras.Sequential([
      layers.Dense(784, activation='sigmoid'),
      layers.Reshape((28, 28))
    ])

  def call(self, x):
    encoded = self.encoder(x)
    decoded = self.decoder(encoded)
    return decoded

autoencoder = Autoencoder(latent_dim)
autoencoder.compile(optimizer='adam', loss=losses.MeanSquaredError())

Entrenar el modelo usando x_train como tanto la entrada y la diana. El encoder aprenderá a comprimir el conjunto de datos de 784 dimensiones para el espacio latente y el decoder aprenderá a reconstruir las imágenes originales. .

autoencoder.fit(x_train, x_train,
                epochs=10,
                shuffle=True,
                validation_data=(x_test, x_test))
Epoch 1/10
1875/1875 [==============================] - 3s 1ms/step - loss: 0.0237 - val_loss: 0.0131
Epoch 2/10
1875/1875 [==============================] - 2s 1ms/step - loss: 0.0115 - val_loss: 0.0105
Epoch 3/10
1875/1875 [==============================] - 2s 1ms/step - loss: 0.0100 - val_loss: 0.0097
Epoch 4/10
1875/1875 [==============================] - 2s 1ms/step - loss: 0.0094 - val_loss: 0.0095
Epoch 5/10
1875/1875 [==============================] - 2s 1ms/step - loss: 0.0092 - val_loss: 0.0091
Epoch 6/10
1875/1875 [==============================] - 2s 1ms/step - loss: 0.0090 - val_loss: 0.0091
Epoch 7/10
1875/1875 [==============================] - 2s 1ms/step - loss: 0.0089 - val_loss: 0.0090
Epoch 8/10
1875/1875 [==============================] - 2s 1ms/step - loss: 0.0088 - val_loss: 0.0089
Epoch 9/10
1875/1875 [==============================] - 2s 1ms/step - loss: 0.0088 - val_loss: 0.0089
Epoch 10/10
1875/1875 [==============================] - 2s 1ms/step - loss: 0.0087 - val_loss: 0.0088
<tensorflow.python.keras.callbacks.History at 0x7ff99c0d3a10>

Ahora que el modelo está entrenado, probémoslo codificando y decodificando imágenes del conjunto de prueba.

encoded_imgs = autoencoder.encoder(x_test).numpy()
decoded_imgs = autoencoder.decoder(encoded_imgs).numpy()
n = 10
plt.figure(figsize=(20, 4))
for i in range(n):
  # display original
  ax = plt.subplot(2, n, i + 1)
  plt.imshow(x_test[i])
  plt.title("original")
  plt.gray()
  ax.get_xaxis().set_visible(False)
  ax.get_yaxis().set_visible(False)

  # display reconstruction
  ax = plt.subplot(2, n, i + 1 + n)
  plt.imshow(decoded_imgs[i])
  plt.title("reconstructed")
  plt.gray()
  ax.get_xaxis().set_visible(False)
  ax.get_yaxis().set_visible(False)
plt.show()

png

Segundo ejemplo: eliminación de ruido de imágenes

Resultados de eliminación de ruido de imágenes

También se puede entrenar un codificador automático para eliminar el ruido de las imágenes. En la siguiente sección, creará una versión ruidosa del conjunto de datos Fashion MNIST aplicando ruido aleatorio a cada imagen. A continuación, entrenará un codificador automático utilizando la imagen ruidosa como entrada y la imagen original como destino.

Volvamos a importar el conjunto de datos para omitir las modificaciones realizadas anteriormente.

(x_train, _), (x_test, _) = fashion_mnist.load_data()
x_train = x_train.astype('float32') / 255.
x_test = x_test.astype('float32') / 255.

x_train = x_train[..., tf.newaxis]
x_test = x_test[..., tf.newaxis]

print(x_train.shape)
(60000, 28, 28, 1)

Añadiendo ruido aleatorio a las imágenes

noise_factor = 0.2
x_train_noisy = x_train + noise_factor * tf.random.normal(shape=x_train.shape) 
x_test_noisy = x_test + noise_factor * tf.random.normal(shape=x_test.shape) 

x_train_noisy = tf.clip_by_value(x_train_noisy, clip_value_min=0., clip_value_max=1.)
x_test_noisy = tf.clip_by_value(x_test_noisy, clip_value_min=0., clip_value_max=1.)

Trace las imágenes ruidosas.

n = 10
plt.figure(figsize=(20, 2))
for i in range(n):
    ax = plt.subplot(1, n, i + 1)
    plt.title("original + noise")
    plt.imshow(tf.squeeze(x_test_noisy[i]))
    plt.gray()
plt.show()

png

Definir un codificador automático convolucional

En este ejemplo, se entrenará un autoencoder convolucional utilizando Conv2D capas en el encoder , y Conv2DTranspose capas en el decoder .

class Denoise(Model):
  def __init__(self):
    super(Denoise, self).__init__()
    self.encoder = tf.keras.Sequential([
      layers.Input(shape=(28, 28, 1)),
      layers.Conv2D(16, (3, 3), activation='relu', padding='same', strides=2),
      layers.Conv2D(8, (3, 3), activation='relu', padding='same', strides=2)])

    self.decoder = tf.keras.Sequential([
      layers.Conv2DTranspose(8, kernel_size=3, strides=2, activation='relu', padding='same'),
      layers.Conv2DTranspose(16, kernel_size=3, strides=2, activation='relu', padding='same'),
      layers.Conv2D(1, kernel_size=(3, 3), activation='sigmoid', padding='same')])

  def call(self, x):
    encoded = self.encoder(x)
    decoded = self.decoder(encoded)
    return decoded

autoencoder = Denoise()
WARNING:tensorflow:Please add `keras.layers.InputLayer` instead of `keras.Input` to Sequential model. `keras.Input` is intended to be used by Functional model.
autoencoder.compile(optimizer='adam', loss=losses.MeanSquaredError())
autoencoder.fit(x_train_noisy, x_train,
                epochs=10,
                shuffle=True,
                validation_data=(x_test_noisy, x_test))
Epoch 1/10
1875/1875 [==============================] - 6s 3ms/step - loss: 0.0165 - val_loss: 0.0100
Epoch 2/10
1875/1875 [==============================] - 5s 3ms/step - loss: 0.0094 - val_loss: 0.0089
Epoch 3/10
1875/1875 [==============================] - 5s 3ms/step - loss: 0.0083 - val_loss: 0.0081
Epoch 4/10
1875/1875 [==============================] - 5s 3ms/step - loss: 0.0078 - val_loss: 0.0077
Epoch 5/10
1875/1875 [==============================] - 5s 3ms/step - loss: 0.0076 - val_loss: 0.0075
Epoch 6/10
1875/1875 [==============================] - 5s 2ms/step - loss: 0.0074 - val_loss: 0.0074
Epoch 7/10
1875/1875 [==============================] - 5s 2ms/step - loss: 0.0073 - val_loss: 0.0073
Epoch 8/10
1875/1875 [==============================] - 5s 2ms/step - loss: 0.0072 - val_loss: 0.0073
Epoch 9/10
1875/1875 [==============================] - 5s 3ms/step - loss: 0.0071 - val_loss: 0.0072
Epoch 10/10
1875/1875 [==============================] - 5s 3ms/step - loss: 0.0071 - val_loss: 0.0071
<tensorflow.python.keras.callbacks.History at 0x7ff99ee2a810>

Echemos un vistazo a un resumen del codificador. Observe cómo se reduce la resolución de las imágenes de 28x28 a 7x7.

autoencoder.encoder.summary()
Model: "sequential_2"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
=================================================================
conv2d (Conv2D)              (None, 14, 14, 16)        160       
_________________________________________________________________
conv2d_1 (Conv2D)            (None, 7, 7, 8)           1160      
=================================================================
Total params: 1,320
Trainable params: 1,320
Non-trainable params: 0
_________________________________________________________________

El decodificador muestra las imágenes de 7x7 a 28x28.

autoencoder.decoder.summary()
Model: "sequential_3"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
=================================================================
conv2d_transpose (Conv2DTran (None, 14, 14, 8)         584       
_________________________________________________________________
conv2d_transpose_1 (Conv2DTr (None, 28, 28, 16)        1168      
_________________________________________________________________
conv2d_2 (Conv2D)            (None, 28, 28, 1)         145       
=================================================================
Total params: 1,897
Trainable params: 1,897
Non-trainable params: 0
_________________________________________________________________

Trazar tanto las imágenes con ruido como las imágenes sin ruido producidas por el codificador automático.

encoded_imgs = autoencoder.encoder(x_test).numpy()
decoded_imgs = autoencoder.decoder(encoded_imgs).numpy()
n = 10
plt.figure(figsize=(20, 4))
for i in range(n):

    # display original + noise
    ax = plt.subplot(2, n, i + 1)
    plt.title("original + noise")
    plt.imshow(tf.squeeze(x_test_noisy[i]))
    plt.gray()
    ax.get_xaxis().set_visible(False)
    ax.get_yaxis().set_visible(False)

    # display reconstruction
    bx = plt.subplot(2, n, i + n + 1)
    plt.title("reconstructed")
    plt.imshow(tf.squeeze(decoded_imgs[i]))
    plt.gray()
    bx.get_xaxis().set_visible(False)
    bx.get_yaxis().set_visible(False)
plt.show()

png

Tercer ejemplo: detección de anomalías

Visión general

En este ejemplo, se entrenará un autoencoder para detectar anomalías en el conjunto de datos ECG5000 . Este conjunto de datos contiene 5.000 electrocardiogramas , cada uno con 140 puntos de datos. Que va a utilizar una versión simplificada del conjunto de datos, en donde cada ejemplo se ha marcado ya sea 0 (correspondiente a un ritmo anormal), o 1 (correspondiente a un ritmo normal). Está interesado en identificar los ritmos anormales.

¿Cómo detectará anomalías utilizando un codificador automático? Recuerde que un codificador automático está entrenado para minimizar el error de reconstrucción. Entrenará un codificador automático solo en los ritmos normales, luego lo usará para reconstruir todos los datos. Nuestra hipótesis es que los ritmos anormales tendrán mayor error de reconstrucción. Luego, clasificará un ritmo como una anomalía si el error de reconstrucción supera un umbral fijo.

Cargar datos de ECG

El conjunto de datos que va a utilizar se basa en uno de timeseriesclassification.com .

# Download the dataset
dataframe = pd.read_csv('http://storage.googleapis.com/download.tensorflow.org/data/ecg.csv', header=None)
raw_data = dataframe.values
dataframe.head()
# The last element contains the labels
labels = raw_data[:, -1]

# The other data points are the electrocadriogram data
data = raw_data[:, 0:-1]

train_data, test_data, train_labels, test_labels = train_test_split(
    data, labels, test_size=0.2, random_state=21
)

Normalizar los datos a [0,1] .

min_val = tf.reduce_min(train_data)
max_val = tf.reduce_max(train_data)

train_data = (train_data - min_val) / (max_val - min_val)
test_data = (test_data - min_val) / (max_val - min_val)

train_data = tf.cast(train_data, tf.float32)
test_data = tf.cast(test_data, tf.float32)

Va a entrenar al autoencoder utilizando sólo los ritmos normales, que están marcadas en este conjunto de datos como 1 . Separe los ritmos normales de los anormales.

train_labels = train_labels.astype(bool)
test_labels = test_labels.astype(bool)

normal_train_data = train_data[train_labels]
normal_test_data = test_data[test_labels]

anomalous_train_data = train_data[~train_labels]
anomalous_test_data = test_data[~test_labels]

Trace un ECG normal.

plt.grid()
plt.plot(np.arange(140), normal_train_data[0])
plt.title("A Normal ECG")
plt.show()

png

Trace un ECG anómalo.

plt.grid()
plt.plot(np.arange(140), anomalous_train_data[0])
plt.title("An Anomalous ECG")
plt.show()

png

Construye el modelo

class AnomalyDetector(Model):
  def __init__(self):
    super(AnomalyDetector, self).__init__()
    self.encoder = tf.keras.Sequential([
      layers.Dense(32, activation="relu"),
      layers.Dense(16, activation="relu"),
      layers.Dense(8, activation="relu")])

    self.decoder = tf.keras.Sequential([
      layers.Dense(16, activation="relu"),
      layers.Dense(32, activation="relu"),
      layers.Dense(140, activation="sigmoid")])

  def call(self, x):
    encoded = self.encoder(x)
    decoded = self.decoder(encoded)
    return decoded

autoencoder = AnomalyDetector()
autoencoder.compile(optimizer='adam', loss='mae')

Observe que el autoencoder se entrena usando solo los ECG normales, pero se evalúa usando el conjunto de prueba completo.

history = autoencoder.fit(normal_train_data, normal_train_data, 
          epochs=20, 
          batch_size=512,
          validation_data=(test_data, test_data),
          shuffle=True)
Epoch 1/20
5/5 [==============================] - 0s 25ms/step - loss: 0.0611 - val_loss: 0.0545
Epoch 2/20
5/5 [==============================] - 0s 6ms/step - loss: 0.0568 - val_loss: 0.0523
Epoch 3/20
5/5 [==============================] - 0s 6ms/step - loss: 0.0547 - val_loss: 0.0508
Epoch 4/20
5/5 [==============================] - 0s 6ms/step - loss: 0.0524 - val_loss: 0.0490
Epoch 5/20
5/5 [==============================] - 0s 6ms/step - loss: 0.0495 - val_loss: 0.0472
Epoch 6/20
5/5 [==============================] - 0s 6ms/step - loss: 0.0460 - val_loss: 0.0461
Epoch 7/20
5/5 [==============================] - 0s 6ms/step - loss: 0.0428 - val_loss: 0.0451
Epoch 8/20
5/5 [==============================] - 0s 6ms/step - loss: 0.0397 - val_loss: 0.0432
Epoch 9/20
5/5 [==============================] - 0s 6ms/step - loss: 0.0364 - val_loss: 0.0414
Epoch 10/20
5/5 [==============================] - 0s 6ms/step - loss: 0.0337 - val_loss: 0.0403
Epoch 11/20
5/5 [==============================] - 0s 6ms/step - loss: 0.0315 - val_loss: 0.0394
Epoch 12/20
5/5 [==============================] - 0s 6ms/step - loss: 0.0297 - val_loss: 0.0385
Epoch 13/20
5/5 [==============================] - 0s 6ms/step - loss: 0.0282 - val_loss: 0.0378
Epoch 14/20
5/5 [==============================] - 0s 6ms/step - loss: 0.0270 - val_loss: 0.0372
Epoch 15/20
5/5 [==============================] - 0s 6ms/step - loss: 0.0261 - val_loss: 0.0366
Epoch 16/20
5/5 [==============================] - 0s 6ms/step - loss: 0.0252 - val_loss: 0.0361
Epoch 17/20
5/5 [==============================] - 0s 6ms/step - loss: 0.0245 - val_loss: 0.0355
Epoch 18/20
5/5 [==============================] - 0s 6ms/step - loss: 0.0238 - val_loss: 0.0350
Epoch 19/20
5/5 [==============================] - 0s 6ms/step - loss: 0.0232 - val_loss: 0.0347
Epoch 20/20
5/5 [==============================] - 0s 6ms/step - loss: 0.0227 - val_loss: 0.0342
plt.plot(history.history["loss"], label="Training Loss")
plt.plot(history.history["val_loss"], label="Validation Loss")
plt.legend()
<matplotlib.legend.Legend at 0x7ff99f02ea10>

png

Pronto clasificará un ECG como anómalo si el error de reconstrucción es mayor que una desviación estándar de los ejemplos de entrenamiento normales. Primero, tracemos un ECG normal del conjunto de entrenamiento, la reconstrucción después de que el codificador automático lo codifique y decodifique, y el error de reconstrucción.

encoded_data = autoencoder.encoder(normal_test_data).numpy()
decoded_data = autoencoder.decoder(encoded_data).numpy()

plt.plot(normal_test_data[0], 'b')
plt.plot(decoded_data[0], 'r')
plt.fill_between(np.arange(140), decoded_data[0], normal_test_data[0], color='lightcoral')
plt.legend(labels=["Input", "Reconstruction", "Error"])
plt.show()

png

Cree un gráfico similar, esta vez para un ejemplo de prueba anómalo.

encoded_data = autoencoder.encoder(anomalous_test_data).numpy()
decoded_data = autoencoder.decoder(encoded_data).numpy()

plt.plot(anomalous_test_data[0], 'b')
plt.plot(decoded_data[0], 'r')
plt.fill_between(np.arange(140), decoded_data[0], anomalous_test_data[0], color='lightcoral')
plt.legend(labels=["Input", "Reconstruction", "Error"])
plt.show()

png

Detecta anomalías

Detecte anomalías calculando si la pérdida de reconstrucción es mayor que un umbral fijo. En este tutorial, calculará el error promedio medio para ejemplos normales del conjunto de entrenamiento, luego clasificará los ejemplos futuros como anómalos si el error de reconstrucción es mayor que una desviación estándar del conjunto de entrenamiento.

Grafique el error de reconstrucción en ECG normales del conjunto de entrenamiento

reconstructions = autoencoder.predict(normal_train_data)
train_loss = tf.keras.losses.mae(reconstructions, normal_train_data)

plt.hist(train_loss[None,:], bins=50)
plt.xlabel("Train loss")
plt.ylabel("No of examples")
plt.show()

png

Elija un valor de umbral que sea una desviación estándar por encima de la media.

threshold = np.mean(train_loss) + np.std(train_loss)
print("Threshold: ", threshold)
Threshold:  0.034232758

Si examina el error de reconstrucción de los ejemplos anómalos en el conjunto de prueba, notará que la mayoría tiene un error de reconstrucción mayor que el umbral. Por varing el umbral, se puede ajustar la precisión y recordar de su clasificador.

reconstructions = autoencoder.predict(anomalous_test_data)
test_loss = tf.keras.losses.mae(reconstructions, anomalous_test_data)

plt.hist(test_loss[None, :], bins=50)
plt.xlabel("Test loss")
plt.ylabel("No of examples")
plt.show()

png

Clasifique un ECG como anomalía si el error de reconstrucción es mayor que el umbral.

def predict(model, data, threshold):
  reconstructions = model(data)
  loss = tf.keras.losses.mae(reconstructions, data)
  return tf.math.less(loss, threshold)

def print_stats(predictions, labels):
  print("Accuracy = {}".format(accuracy_score(labels, predictions)))
  print("Precision = {}".format(precision_score(labels, predictions)))
  print("Recall = {}".format(recall_score(labels, predictions)))
preds = predict(autoencoder, test_data, threshold)
print_stats(preds, test_labels)
Accuracy = 0.943
Precision = 0.9921722113502935
Recall = 0.9053571428571429

Próximos pasos

Para obtener más información sobre la detección de anomalías con autoencoders, echa un vistazo a este excelente ejemplo interactiva construida con TensorFlow.js de Victor Dibia. Para un caso de uso en el mundo real, se puede aprender Airbus detecta anomalías en la ISS datos de telemetría usando TensorFlow. Para aprender más acerca de los fundamentos, considerar leer esta entrada del blog de François Chollet. Para más detalles, consulte el Capítulo 14 de profundo aprendizaje por Ian Goodfellow, Yoshua Bengio, y Aaron Courville.