Ensembles de données Tensorflow des collections MongoDB

Voir sur TensorFlow.org Exécuter dans Google Colab Voir la source sur GitHub Télécharger le cahier

Aperçu

Ce tutoriel se concentre sur la préparation tf.data.Dataset s par la lecture des données de collections MongoDB et l' utiliser pour la formation d' un tf.keras modèle.

Paquets d'installation

Ce tutoriel utilise pymongo comme un paquet d'aide pour créer une nouvelle base de données MongoDB et de collecte pour stocker les données.

Installez les packages tensorflow-io et mongodb (helper) requis

pip install -q tensorflow-io
pip install -q pymongo

Importer des packages

import os
import time
from pprint import pprint
from sklearn.model_selection import train_test_split
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras.layers.experimental import preprocessing
import tensorflow_io as tfio
from pymongo import MongoClient

Valider les importations tf et tfio

print("tensorflow-io version: {}".format(tfio.__version__))
print("tensorflow version: {}".format(tf.__version__))
tensorflow-io version: 0.20.0
tensorflow version: 2.6.0

Téléchargez et configurez l'instance MongoDB

À des fins de démonstration, la version open source de mongodb est utilisée.


sudo apt install -y mongodb >log
service mongodb start

* Starting database mongodb
   ...done.
WARNING: apt does not have a stable CLI interface. Use with caution in scripts.

debconf: unable to initialize frontend: Dialog
debconf: (No usable dialog-like program is installed, so the dialog based frontend cannot be used. at /usr/share/perl5/Debconf/FrontEnd/Dialog.pm line 76, <> line 8.)
debconf: falling back to frontend: Readline
debconf: unable to initialize frontend: Readline
debconf: (This frontend requires a controlling tty.)
debconf: falling back to frontend: Teletype
dpkg-preconfigure: unable to re-open stdin:
# Sleep for few seconds to let the instance start.
time.sleep(5)

Une fois que l'instance a été lancé, grep pour mongo dans les processus liste pour confirmer la disponibilité.


ps -ef | grep mongo
mongodb      580       1 13 17:38 ?        00:00:00 /usr/bin/mongod --config /etc/mongodb.conf
root         612     610  0 17:38 ?        00:00:00 grep mongo

interrogez le point de terminaison de base pour récupérer des informations sur le cluster.

client = MongoClient()
client.list_database_names() # ['admin', 'local']
['admin', 'local']

Explorer l'ensemble de données

Aux fins de ce tutoriel, permet de télécharger le PetFinder ensemble de données et rentrer les données dans MongoDB manuellement. Le but de ce problème de classification est de prédire si l'animal sera adopté ou non.

dataset_url = 'http://storage.googleapis.com/download.tensorflow.org/data/petfinder-mini.zip'
csv_file = 'datasets/petfinder-mini/petfinder-mini.csv'
tf.keras.utils.get_file('petfinder_mini.zip', dataset_url,
                        extract=True, cache_dir='.')
pf_df = pd.read_csv(csv_file)
Downloading data from http://storage.googleapis.com/download.tensorflow.org/data/petfinder-mini.zip
1671168/1668792 [==============================] - 0s 0us/step
1679360/1668792 [==============================] - 0s 0us/step
pf_df.head()

Pour les besoins du didacticiel, des modifications sont apportées à la colonne d'étiquette. 0 indiquera que l'animal n'a pas été adopté et 1 indiquera qu'il l'a été.

# In the original dataset "4" indicates the pet was not adopted.
pf_df['target'] = np.where(pf_df['AdoptionSpeed']==4, 0, 1)

# Drop un-used columns.
pf_df = pf_df.drop(columns=['AdoptionSpeed', 'Description'])
# Number of datapoints and columns
len(pf_df), len(pf_df.columns)
(11537, 14)

Diviser l'ensemble de données

train_df, test_df = train_test_split(pf_df, test_size=0.3, shuffle=True)
print("Number of training samples: ",len(train_df))
print("Number of testing sample: ",len(test_df))
Number of training samples:  8075
Number of testing sample:  3462

Stockez les données de train et de test dans les collections mongo

URI = "mongodb://localhost:27017"
DATABASE = "tfiodb"
TRAIN_COLLECTION = "train"
TEST_COLLECTION = "test"
db = client[DATABASE]
if "train" not in db.list_collection_names():
  db.create_collection(TRAIN_COLLECTION)
if "test" not in db.list_collection_names():
  db.create_collection(TEST_COLLECTION)
def store_records(collection, records):
  writer = tfio.experimental.mongodb.MongoDBWriter(
      uri=URI, database=DATABASE, collection=collection
  )
  for record in records:
      writer.write(record)
store_records(collection="train", records=train_df.to_dict("records"))
time.sleep(2)
store_records(collection="test", records=test_df.to_dict("records"))

Préparer les jeux de données tfio

Une fois que les données sont disponibles dans le cluster, la mongodb.MongoDBIODataset classe est utilisée à cette fin. La classe hérite de tf.data.Dataset et expose ainsi toutes les fonctionnalités utiles de tf.data.Dataset hors de la boîte.

Ensemble de données d'entraînement

train_ds = tfio.experimental.mongodb.MongoDBIODataset(
        uri=URI, database=DATABASE, collection=TRAIN_COLLECTION
    )

train_ds
Connection successful: mongodb://localhost:27017
WARNING:tensorflow:From /usr/local/lib/python3.7/dist-packages/tensorflow/python/data/experimental/ops/counter.py:66: scan (from tensorflow.python.data.experimental.ops.scan_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.data.Dataset.scan(...) instead
WARNING:tensorflow:From /usr/local/lib/python3.7/dist-packages/tensorflow_io/python/experimental/mongodb_dataset_ops.py:114: take_while (from tensorflow.python.data.experimental.ops.take_while_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.data.Dataset.take_while(...)
<MongoDBIODataset shapes: (), types: tf.string>

Chaque élément de train_ds est une chaîne qui doit être décodé en JSON. Pour ce faire, vous pouvez sélectionner un sous - ensemble des colonnes en spécifiant le TensorSpec

# Numeric features.
numerical_cols = ['PhotoAmt', 'Fee'] 

SPECS = {
    "target": tf.TensorSpec(tf.TensorShape([]), tf.int64, name="target"),
}
for col in numerical_cols:
  SPECS[col] = tf.TensorSpec(tf.TensorShape([]), tf.int32, name=col)
pprint(SPECS)
{'Fee': TensorSpec(shape=(), dtype=tf.int32, name='Fee'),
 'PhotoAmt': TensorSpec(shape=(), dtype=tf.int32, name='PhotoAmt'),
 'target': TensorSpec(shape=(), dtype=tf.int64, name='target')}
BATCH_SIZE=32
train_ds = train_ds.map(
        lambda x: tfio.experimental.serialization.decode_json(x, specs=SPECS)
    )

# Prepare a tuple of (features, label)
train_ds = train_ds.map(lambda v: (v, v.pop("target")))
train_ds = train_ds.batch(BATCH_SIZE)

train_ds
<BatchDataset shapes: ({PhotoAmt: (None,), Fee: (None,)}, (None,)), types: ({PhotoAmt: tf.int32, Fee: tf.int32}, tf.int64)>

Jeu de données de test

test_ds = tfio.experimental.mongodb.MongoDBIODataset(
        uri=URI, database=DATABASE, collection=TEST_COLLECTION
    )
test_ds = test_ds.map(
        lambda x: tfio.experimental.serialization.decode_json(x, specs=SPECS)
    )
# Prepare a tuple of (features, label)
test_ds = test_ds.map(lambda v: (v, v.pop("target")))
test_ds = test_ds.batch(BATCH_SIZE)

test_ds
Connection successful: mongodb://localhost:27017
<BatchDataset shapes: ({PhotoAmt: (None,), Fee: (None,)}, (None,)), types: ({PhotoAmt: tf.int32, Fee: tf.int32}, tf.int64)>

Définir les couches de prétraitement des keras

Selon le didacticiel de données structurées , il est recommandé d'utiliser les couches Keras Prétraitement car ils sont plus intuitive, et peut être facilement intégré aux modèles. Cependant, la norme feature_columns peuvent également être utilisés.

Pour une meilleure compréhension des preprocessing_layers dans la classification des données structurées, s'il vous plaît se référer au tutoriel de données structurées

def get_normalization_layer(name, dataset):
  # Create a Normalization layer for our feature.
  normalizer = preprocessing.Normalization(axis=None)

  # Prepare a Dataset that only yields our feature.
  feature_ds = dataset.map(lambda x, y: x[name])

  # Learn the statistics of the data.
  normalizer.adapt(feature_ds)

  return normalizer
all_inputs = []
encoded_features = []

for header in numerical_cols:
  numeric_col = tf.keras.Input(shape=(1,), name=header)
  normalization_layer = get_normalization_layer(header, train_ds)
  encoded_numeric_col = normalization_layer(numeric_col)
  all_inputs.append(numeric_col)
  encoded_features.append(encoded_numeric_col)

Construire, compiler et entraîner le modèle

# Set the parameters

OPTIMIZER="adam"
LOSS=tf.keras.losses.BinaryCrossentropy(from_logits=True)
METRICS=['accuracy']
EPOCHS=10
# Convert the feature columns into a tf.keras layer
all_features = tf.keras.layers.concatenate(encoded_features)

# design/build the model
x = tf.keras.layers.Dense(32, activation="relu")(all_features)
x = tf.keras.layers.Dropout(0.5)(x)
x = tf.keras.layers.Dense(64, activation="relu")(x)
x = tf.keras.layers.Dropout(0.5)(x)
output = tf.keras.layers.Dense(1)(x)
model = tf.keras.Model(all_inputs, output)
# compile the model
model.compile(optimizer=OPTIMIZER, loss=LOSS, metrics=METRICS)
# fit the model
model.fit(train_ds, epochs=EPOCHS)
Epoch 1/10
109/109 [==============================] - 1s 2ms/step - loss: 0.6261 - accuracy: 0.4711
Epoch 2/10
109/109 [==============================] - 0s 3ms/step - loss: 0.5939 - accuracy: 0.6967
Epoch 3/10
109/109 [==============================] - 0s 3ms/step - loss: 0.5900 - accuracy: 0.6993
Epoch 4/10
109/109 [==============================] - 0s 3ms/step - loss: 0.5846 - accuracy: 0.7146
Epoch 5/10
109/109 [==============================] - 0s 3ms/step - loss: 0.5824 - accuracy: 0.7178
Epoch 6/10
109/109 [==============================] - 0s 2ms/step - loss: 0.5778 - accuracy: 0.7233
Epoch 7/10
109/109 [==============================] - 0s 3ms/step - loss: 0.5810 - accuracy: 0.7083
Epoch 8/10
109/109 [==============================] - 0s 3ms/step - loss: 0.5791 - accuracy: 0.7149
Epoch 9/10
109/109 [==============================] - 0s 3ms/step - loss: 0.5742 - accuracy: 0.7207
Epoch 10/10
109/109 [==============================] - 0s 2ms/step - loss: 0.5797 - accuracy: 0.7083
<keras.callbacks.History at 0x7f743229fe90>

Déduire sur les données de test

res = model.evaluate(test_ds)
print("test loss, test acc:", res)
109/109 [==============================] - 0s 2ms/step - loss: 0.5696 - accuracy: 0.7383
test loss, test acc: [0.569588840007782, 0.7383015751838684]

Les références: