מערכי נתונים של Tensorflow מאוספי MongoDB

הצג באתר TensorFlow.org הפעל בגוגל קולאב צפה במקור ב-GitHub הורד מחברת

סקירה כללית

הדרכה זו מתמקדת בהכנת tf.data.Dataset ים על ידי קריאת נתונים מאוספים MongoDB ולהשתמש בו עבור אימון tf.keras מודל.

חבילות התקנה

שימושים הדרכה זו pymongo כחבילה עוזר ליצור מסד נתונים MongoDB חדש ואיסוף לאחסן את הנתונים.

התקן את חבילות tensorflow-io ו-mongodb (עוזר) הנדרשות

pip install -q tensorflow-io
pip install -q pymongo

ייבוא ​​חבילות

import os
import time
from pprint import pprint
from sklearn.model_selection import train_test_split
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras.layers.experimental import preprocessing
import tensorflow_io as tfio
from pymongo import MongoClient

אימות יבוא tf ו-tfio

print("tensorflow-io version: {}".format(tfio.__version__))
print("tensorflow version: {}".format(tf.__version__))
tensorflow-io version: 0.20.0
tensorflow version: 2.6.0

הורד והגדר את מופע MongoDB

למטרות הדגמה, נעשה שימוש בגרסת הקוד הפתוח של mongodb.


sudo apt install -y mongodb >log
service mongodb start

* Starting database mongodb
   ...done.
WARNING: apt does not have a stable CLI interface. Use with caution in scripts.

debconf: unable to initialize frontend: Dialog
debconf: (No usable dialog-like program is installed, so the dialog based frontend cannot be used. at /usr/share/perl5/Debconf/FrontEnd/Dialog.pm line 76, <> line 8.)
debconf: falling back to frontend: Readline
debconf: unable to initialize frontend: Readline
debconf: (This frontend requires a controlling tty.)
debconf: falling back to frontend: Teletype
dpkg-preconfigure: unable to re-open stdin:
# Sleep for few seconds to let the instance start.
time.sleep(5)

לאחר המקרה כבר נכתב, grep עבור mongo בתהליכי הרשימה כדי לאשר את הזמינות.


ps -ef | grep mongo
mongodb      580       1 13 17:38 ?        00:00:00 /usr/bin/mongod --config /etc/mongodb.conf
root         612     610  0 17:38 ?        00:00:00 grep mongo

שאל את נקודת הקצה הבסיסית כדי לאחזר מידע על האשכול.

client = MongoClient()
client.list_database_names() # ['admin', 'local']
['admin', 'local']

חקור את מערך הנתונים

לצורך הדרכה זו, מאפשרת להוריד את PetFinder במערך ולהאכיל את הנתונים MongoDB ידני. המטרה של בעיית סיווג זו היא לחזות אם חיית המחמד תאומץ או לא.

dataset_url = 'http://storage.googleapis.com/download.tensorflow.org/data/petfinder-mini.zip'
csv_file = 'datasets/petfinder-mini/petfinder-mini.csv'
tf.keras.utils.get_file('petfinder_mini.zip', dataset_url,
                        extract=True, cache_dir='.')
pf_df = pd.read_csv(csv_file)
Downloading data from http://storage.googleapis.com/download.tensorflow.org/data/petfinder-mini.zip
1671168/1668792 [==============================] - 0s 0us/step
1679360/1668792 [==============================] - 0s 0us/step
pf_df.head()

לצורך המדריך, מתבצעים שינויים בעמודת התווית. 0 יציין שחיית המחמד לא אומצה, ו-1 יציין שכן.

# In the original dataset "4" indicates the pet was not adopted.
pf_df['target'] = np.where(pf_df['AdoptionSpeed']==4, 0, 1)

# Drop un-used columns.
pf_df = pf_df.drop(columns=['AdoptionSpeed', 'Description'])
# Number of datapoints and columns
len(pf_df), len(pf_df.columns)
(11537, 14)

פצל את מערך הנתונים

train_df, test_df = train_test_split(pf_df, test_size=0.3, shuffle=True)
print("Number of training samples: ",len(train_df))
print("Number of testing sample: ",len(test_df))
Number of training samples:  8075
Number of testing sample:  3462

אחסן את נתוני הרכבת והבדיקה באוספים של מונגו

URI = "mongodb://localhost:27017"
DATABASE = "tfiodb"
TRAIN_COLLECTION = "train"
TEST_COLLECTION = "test"
db = client[DATABASE]
if "train" not in db.list_collection_names():
  db.create_collection(TRAIN_COLLECTION)
if "test" not in db.list_collection_names():
  db.create_collection(TEST_COLLECTION)
def store_records(collection, records):
  writer = tfio.experimental.mongodb.MongoDBWriter(
      uri=URI, database=DATABASE, collection=collection
  )
  for record in records:
      writer.write(record)
store_records(collection="train", records=train_df.to_dict("records"))
time.sleep(2)
store_records(collection="test", records=test_df.to_dict("records"))

הכן מערכי נתונים של tfio

לאחר שהנתונים זמינים האשכול, mongodb.MongoDBIODataset בכיתה מנוצל למטרה זו. היורש בכיתה מ tf.data.Dataset ובכך חושף את כול פונקציות השימושיות של tf.data.Dataset מהקופסה.

מערך נתונים לאימון

train_ds = tfio.experimental.mongodb.MongoDBIODataset(
        uri=URI, database=DATABASE, collection=TRAIN_COLLECTION
    )

train_ds
Connection successful: mongodb://localhost:27017
WARNING:tensorflow:From /usr/local/lib/python3.7/dist-packages/tensorflow/python/data/experimental/ops/counter.py:66: scan (from tensorflow.python.data.experimental.ops.scan_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.data.Dataset.scan(...) instead
WARNING:tensorflow:From /usr/local/lib/python3.7/dist-packages/tensorflow_io/python/experimental/mongodb_dataset_ops.py:114: take_while (from tensorflow.python.data.experimental.ops.take_while_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.data.Dataset.take_while(...)
<MongoDBIODataset shapes: (), types: tf.string>

כל פריט train_ds הוא מחרוזת אשר צריך להיות מפוענח לתוך JSON. כדי לעשות זאת, אתה יכול לבחור רק מקבץ של העמודות על ידי ציון TensorSpec

# Numeric features.
numerical_cols = ['PhotoAmt', 'Fee'] 

SPECS = {
    "target": tf.TensorSpec(tf.TensorShape([]), tf.int64, name="target"),
}
for col in numerical_cols:
  SPECS[col] = tf.TensorSpec(tf.TensorShape([]), tf.int32, name=col)
pprint(SPECS)
{'Fee': TensorSpec(shape=(), dtype=tf.int32, name='Fee'),
 'PhotoAmt': TensorSpec(shape=(), dtype=tf.int32, name='PhotoAmt'),
 'target': TensorSpec(shape=(), dtype=tf.int64, name='target')}
BATCH_SIZE=32
train_ds = train_ds.map(
        lambda x: tfio.experimental.serialization.decode_json(x, specs=SPECS)
    )

# Prepare a tuple of (features, label)
train_ds = train_ds.map(lambda v: (v, v.pop("target")))
train_ds = train_ds.batch(BATCH_SIZE)

train_ds
<BatchDataset shapes: ({PhotoAmt: (None,), Fee: (None,)}, (None,)), types: ({PhotoAmt: tf.int32, Fee: tf.int32}, tf.int64)>

בדיקת מערך נתונים

test_ds = tfio.experimental.mongodb.MongoDBIODataset(
        uri=URI, database=DATABASE, collection=TEST_COLLECTION
    )
test_ds = test_ds.map(
        lambda x: tfio.experimental.serialization.decode_json(x, specs=SPECS)
    )
# Prepare a tuple of (features, label)
test_ds = test_ds.map(lambda v: (v, v.pop("target")))
test_ds = test_ds.batch(BATCH_SIZE)

test_ds
Connection successful: mongodb://localhost:27017
<BatchDataset shapes: ({PhotoAmt: (None,), Fee: (None,)}, (None,)), types: ({PhotoAmt: tf.int32, Fee: tf.int32}, tf.int64)>

הגדר את שכבות העיבוד המקדים של keras

על פי הדרכת נתונים המובנהית , מומלץ להשתמש שכבות Keras מקדים כפי שהם אינטואיטיבי יותר, והוא יכול בקלות להיות משולב עם המודלים. עם זאת, תקן feature_columns יכול לשמש גם.

עבור הבנה טובה יותר של preprocessing_layers בסיווג נתונים מובנים, עיין הדרכה נתונים מובנים

def get_normalization_layer(name, dataset):
  # Create a Normalization layer for our feature.
  normalizer = preprocessing.Normalization(axis=None)

  # Prepare a Dataset that only yields our feature.
  feature_ds = dataset.map(lambda x, y: x[name])

  # Learn the statistics of the data.
  normalizer.adapt(feature_ds)

  return normalizer
all_inputs = []
encoded_features = []

for header in numerical_cols:
  numeric_col = tf.keras.Input(shape=(1,), name=header)
  normalization_layer = get_normalization_layer(header, train_ds)
  encoded_numeric_col = normalization_layer(numeric_col)
  all_inputs.append(numeric_col)
  encoded_features.append(encoded_numeric_col)

בנה, הידור ואימון המודל

# Set the parameters

OPTIMIZER="adam"
LOSS=tf.keras.losses.BinaryCrossentropy(from_logits=True)
METRICS=['accuracy']
EPOCHS=10
# Convert the feature columns into a tf.keras layer
all_features = tf.keras.layers.concatenate(encoded_features)

# design/build the model
x = tf.keras.layers.Dense(32, activation="relu")(all_features)
x = tf.keras.layers.Dropout(0.5)(x)
x = tf.keras.layers.Dense(64, activation="relu")(x)
x = tf.keras.layers.Dropout(0.5)(x)
output = tf.keras.layers.Dense(1)(x)
model = tf.keras.Model(all_inputs, output)
# compile the model
model.compile(optimizer=OPTIMIZER, loss=LOSS, metrics=METRICS)
# fit the model
model.fit(train_ds, epochs=EPOCHS)
Epoch 1/10
109/109 [==============================] - 1s 2ms/step - loss: 0.6261 - accuracy: 0.4711
Epoch 2/10
109/109 [==============================] - 0s 3ms/step - loss: 0.5939 - accuracy: 0.6967
Epoch 3/10
109/109 [==============================] - 0s 3ms/step - loss: 0.5900 - accuracy: 0.6993
Epoch 4/10
109/109 [==============================] - 0s 3ms/step - loss: 0.5846 - accuracy: 0.7146
Epoch 5/10
109/109 [==============================] - 0s 3ms/step - loss: 0.5824 - accuracy: 0.7178
Epoch 6/10
109/109 [==============================] - 0s 2ms/step - loss: 0.5778 - accuracy: 0.7233
Epoch 7/10
109/109 [==============================] - 0s 3ms/step - loss: 0.5810 - accuracy: 0.7083
Epoch 8/10
109/109 [==============================] - 0s 3ms/step - loss: 0.5791 - accuracy: 0.7149
Epoch 9/10
109/109 [==============================] - 0s 3ms/step - loss: 0.5742 - accuracy: 0.7207
Epoch 10/10
109/109 [==============================] - 0s 2ms/step - loss: 0.5797 - accuracy: 0.7083
<keras.callbacks.History at 0x7f743229fe90>

הסיק על נתוני הבדיקה

res = model.evaluate(test_ds)
print("test loss, test acc:", res)
109/109 [==============================] - 0s 2ms/step - loss: 0.5696 - accuracy: 0.7383
test loss, test acc: [0.569588840007782, 0.7383015751838684]

הפניות: