Avro Dataset API

הצג באתר TensorFlow.org הפעל בגוגל קולאב צפה במקור ב-GitHub הורד מחברת

סקירה כללית

מטרת API מערך נתוני אברו היא לטעון מעוצב אברו נתונים מקורי לתוך TensorFlow כמו במערך TensorFlow . Avro היא מערכת הסדרת נתונים הדומה למאגרי פרוטוקול. זה נמצא בשימוש נרחב ב- Apache Hadoop שבו הוא יכול לספק גם פורמט סידורי עבור נתונים קבועים וגם פורמט תיל לתקשורת בין צמתי Hadoop. Avro data הוא פורמט נתונים בינארי דחוס מכוון שורה. זה מסתמך על סכימה המאוחסנת כקובץ JSON נפרד. עבור המפרט של הצהרת סכימת פורמט אברו, עיין במדריך הרשמי .

חבילת התקנה

התקן את חבילת tensorflow-io הנדרשת

pip install tensorflow-io

ייבוא ​​חבילות

import tensorflow as tf
import tensorflow_io as tfio

אימות יבוא tf ו-tfio

print("tensorflow-io version: {}".format(tfio.__version__))
print("tensorflow version: {}".format(tf.__version__))
tensorflow-io version: 0.18.0
tensorflow version: 2.5.0

נוֹהָג

חקור את מערך הנתונים

לצורך הדרכה זו, הבה נוריד את מערך הנתונים לדוגמה של Avro.

הורד קובץ Avro לדוגמה:

curl -OL https://github.com/tensorflow/io/raw/master/docs/tutorials/avro/train.avro
ls -l train.avro
% Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   151  100   151    0     0   1268      0 --:--:-- --:--:-- --:--:--  1268
100   369  100   369    0     0   1255      0 --:--:-- --:--:-- --:--:--  1255
-rw-rw-r-- 1 kbuilder kokoro 369 May 25 22:23 train.avro

הורד את קובץ הסכימה המתאים של קובץ Avro לדוגמה:

curl -OL https://github.com/tensorflow/io/raw/master/docs/tutorials/avro/train.avsc
ls -l train.avsc
% Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   151  100   151    0     0   1247      0 --:--:-- --:--:-- --:--:--  1247
100   271  100   271    0     0    780      0 --:--:-- --:--:-- --:--:--   780
-rw-rw-r-- 1 kbuilder kokoro 271 May 25 22:23 train.avsc

בדוגמה שלמעלה, נוצר מערך נתונים של Avro לבדיקה המבוסס על מערך נתונים של mnist. בסיס הנתונים mnist המקורי בפורמט TFRecord מופק במערך בשם TF . עם זאת, מערך הנתונים של mnist גדול מדי כמערך נתונים של הדגמה. לשם הפשטות, רובו נחתך ורק כמה רישומים ראשונים נשמרו. יתר על כן, זמירה נוספת נעשתה עבור image שדה במערך mnist המקורי ומיפו אותו features השדה אברו. אז את הקובץ אברו train.avro יש 4 רשומות, שלכל אחד מהם יש 3 שדות: features , אשר הוא מערך של int, label , מספר שלם או null, ואת dataType , גידול enum. להצגה המפוענחת train.avro (הערת קובץ נתון אברו המקורי אינה קריאה כמו אברו הוא פורמט דחוס):

התקן את החבילה הנדרשת לקריאת קובץ Avro:

pip install avro

כדי לקרוא ולהדפיס קובץ Avro בפורמט הניתן לקריאה אנושית:

from avro.io import DatumReader
from avro.datafile import DataFileReader

import json

def print_avro(avro_file, max_record_num=None):
    if max_record_num is not None and max_record_num <= 0:
        return

    with open(avro_file, 'rb') as avro_handler:
        reader = DataFileReader(avro_handler, DatumReader())
        record_count = 0
        for record in reader:
            record_count = record_count+1
            print(record)
            if max_record_num is not None and record_count == max_record_num:
               break

print_avro(avro_file='train.avro')
{'features': [0, 0, 0, 1, 4], 'label': None, 'dataType': 'TRAINING'}
{'features': [0, 0], 'label': 2, 'dataType': 'TRAINING'}
{'features': [0], 'label': 3, 'dataType': 'VALIDATION'}
{'features': [1], 'label': 4, 'dataType': 'VALIDATION'}

וגם הסכימה של train.avro אשר מיוצגת על ידי train.avsc הוא קובץ בתבנית JSON. להצגת train.avsc :

def print_schema(avro_schema_file):
    with open(avro_schema_file, 'r') as handle:
        parsed = json.load(handle)
    print(json.dumps(parsed, indent=4, sort_keys=True))

print_schema('train.avsc')
{
    "fields": [
        {
            "name": "features",
            "type": {
                "items": "int",
                "type": "array"
            }
        },
        {
            "name": "label",
            "type": [
                "int",
                "null"
            ]
        },
        {
            "name": "dataType",
            "type": {
                "name": "dataTypes",
                "symbols": [
                    "TRAINING",
                    "VALIDATION"
                ],
                "type": "enum"
            }
        }
    ],
    "name": "ImageDataset",
    "type": "record"
}

הכן את מערך הנתונים

טען train.avro כמו במערך TensorFlow עם API הנתונים אברו:

features = {
    'features[*]': tfio.experimental.columnar.VarLenFeatureWithRank(dtype=tf.int32),
    'label': tf.io.FixedLenFeature(shape=[], dtype=tf.int32, default_value=-100),
    'dataType': tf.io.FixedLenFeature(shape=[], dtype=tf.string)
}

schema = tf.io.gfile.GFile('train.avsc').read()

dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
                                                              reader_schema=schema,
                                                              features=features,
                                                              shuffle=False,
                                                              batch_size=3,
                                                              num_epochs=1)

for record in dataset:
    print(record['features[*]'])
    print(record['label'])
    print(record['dataType'])
    print("--------------------")
SparseTensor(indices=tf.Tensor(
[[0 0]
 [0 1]
 [0 2]
 [0 3]
 [0 4]
 [1 0]
 [1 1]
 [2 0]], shape=(8, 2), dtype=int64), values=tf.Tensor([0 0 0 1 4 0 0 0], shape=(8,), dtype=int32), dense_shape=tf.Tensor([3 5], shape=(2,), dtype=int64))
tf.Tensor([-100    2    3], shape=(3,), dtype=int32)
tf.Tensor([b'TRAINING' b'TRAINING' b'VALIDATION'], shape=(3,), dtype=string)
--------------------
SparseTensor(indices=tf.Tensor([[0 0]], shape=(1, 2), dtype=int64), values=tf.Tensor([1], shape=(1,), dtype=int32), dense_shape=tf.Tensor([1 1], shape=(2,), dtype=int64))
tf.Tensor([4], shape=(1,), dtype=int32)
tf.Tensor([b'VALIDATION'], shape=(1,), dtype=string)
--------------------

למומרים בדוגמה לעיל train.avro לתוך הנתונים tensorflow. כל רכיב במערך הנתונים הוא מילון שהמפתח שלו הוא שם התכונה, הערך הוא הטנסור הדליל או הצפוף שהומר. לדוגמא, הוא ממיר features , label , dataType בשטח על VarLenFeature (SparseTensor), FixedLenFeature (DenseTensor), ו FixedLenFeature (DenseTensor) בהתאמה. מאז batch_size הוא 3, זה לכפות 3 רשומות train.avro לתוך אלמנט אחד הנתונים תוצאה. למען הפרוטוקול הראשון train.avro התווית שלו הוא null, מחליף קורא אברו זה עם ערך ברירת המחדל המוגדר (-100). בדוגמה זו, יש כבר 4 רשומות בסך הכל ב train.avro . מאז גודל אצווה הוא 3, בסיס הנתונים תוצאה מכיל 3 אלמנטים, שעבר גודל אצווה של מי הוא 1. עם זאת המשתמש הוא גם מסוגל לוותר על המנה האחרונה, אם גודל קטן יותר בגודל המנה, בכך שהיא מאפשרת drop_final_batch . לְמָשָׁל:

dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
                                                              reader_schema=schema,
                                                              features=features,
                                                              shuffle=False,
                                                              batch_size=3,
                                                              drop_final_batch=True,
                                                              num_epochs=1)

for record in dataset:
    print(record)
{'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f97656423d0>, 'dataType': <tf.Tensor: shape=(3,), dtype=string, numpy=array([b'TRAINING', b'TRAINING', b'VALIDATION'], dtype=object)>, 'label': <tf.Tensor: shape=(3,), dtype=int32, numpy=array([-100,    2,    3], dtype=int32)>}

אפשר גם להגדיל את num_parallel_reads כדי לזרז את עיבוד הנתונים של Avro על ידי הגדלת מקביליות avro parse/read.

dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
                                                              reader_schema=schema,
                                                              features=features,
                                                              shuffle=False,
                                                              num_parallel_reads=16,
                                                              batch_size=3,
                                                              drop_final_batch=True,
                                                              num_epochs=1)

for record in dataset:
    print(record)
{'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f9765693990>, 'dataType': <tf.Tensor: shape=(3,), dtype=string, numpy=array([b'TRAINING', b'TRAINING', b'VALIDATION'], dtype=object)>, 'label': <tf.Tensor: shape=(3,), dtype=int32, numpy=array([-100,    2,    3], dtype=int32)>}

עבור השימוש מפורט של make_avro_record_dataset , עיין doc API .

הרכבת מודלים של tf.keras עם מערך נתונים של Avro

כעת נעבור על דוגמה מקצה לקצה של אימון מודל tf.keras עם מערך נתונים של Avro המבוסס על מערך נתונים של mnist.

טען train.avro כמו במערך TensorFlow עם API הנתונים אברו:

features = {
    'features[*]': tfio.experimental.columnar.VarLenFeatureWithRank(dtype=tf.int32)
}

schema = tf.io.gfile.GFile('train.avsc').read()

dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
                                                              reader_schema=schema,
                                                              features=features,
                                                              shuffle=False,
                                                              batch_size=1,
                                                              num_epochs=1)

הגדר דגם פשוט של קרס:

def build_and_compile_cnn_model():
    model = tf.keras.Sequential()
    model.compile(optimizer='sgd', loss='mse')
    return model

model = build_and_compile_cnn_model()

אמן את מודל ה-keras עם מערך הנתונים של Avro:

model.fit(x=dataset, epochs=1, steps_per_epoch=1, verbose=1)
WARNING:tensorflow:Layers in a Sequential model should only have a single input tensor, but we receive a <class 'dict'> input: {'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f94b00645d0>}
Consider rewriting this model with the Functional API.
WARNING:tensorflow:Layers in a Sequential model should only have a single input tensor, but we receive a <class 'dict'> input: {'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f976476ca90>}
Consider rewriting this model with the Functional API.
1/1 [==============================] - 0s 60ms/step - loss: 0.0000e+00
<tensorflow.python.keras.callbacks.History at 0x7f94ec08c6d0>

מערך הנתונים של avro יכול לנתח ולאלץ כל נתוני avro לתוך טנסור TensorFlow, כולל רשומות ברשומות, מפות, מערכים, ענפים וספירות. מידע הניתוח מועבר למימוש הנתונים של avro כמפה שבה מפתחות מקודדים כיצד לנתח את ערכי הנתונים המקודדים כיצד לכפות את הנתונים לתוך טנסור TensorFlow - מחליטים על הסוג הפרימיטיבי (למשל bool, int, long, float, double, string ) וכן סוג הטנזור (למשל דליל או צפוף). מוצגת רשימה של סוגי הנתח של TensorFlow (ראה טבלה 1) והכפייה של טיפוסים פרימיטיביים (טבלה 2).

טבלה 1 סוגי מנתח TensorFlow הנתמכים:

סוגי מנתח TensorFlow TensorFlow Tensors הֶסבֵּר
tf.FixedLenFeature([], tf.int32) טנסור צפוף נתח תכונה באורך קבוע; כלומר לכל השורות יש אותו מספר קבוע של אלמנטים, למשל רק אלמנט אחד או מערך שיש לו תמיד אותו מספר אלמנטים עבור כל שורה
tf.SparseFeature(index_key=['key_1st_index', 'key_2nd_index'], value_key='key_value', dtype=tf.int64, size=[20, 50]) טנסור דליל נתח תכונה דלילה כאשר לכל שורה יש רשימה באורך משתנה של מדדים וערכים. ה-'index_key' מזהה את המדדים. ה-'value_key' מזהה את הערך. ה-'dtype' הוא סוג הנתונים. ה'גודל' הוא ערך המדד המקסימלי הצפוי עבור כל ערך אינדקס
tfio.experimental.columnar.VarLenFeatureWithRank([],tf.int64) טנסור דליל נתח תכונה באורך משתנה; כלומר לכל שורת נתונים יכולה להיות מספר משתנה של אלמנטים, למשל בשורה הראשונה יש 5 אלמנטים, בשורה השנייה יש 7 אלמנטים

טבלה 2 ההמרה הנתמכת מסוגי Avro לסוגים של TensorFlow:

סוג אברו פרימיטיבי סוג TensorFlow פרימיטיבי
בוליאני: ערך בינארי tf.bool
בתים: רצף של בתים ללא סימנים של 8 סיביות tf.string
כפול: דיוק כפול 64 סיביות IEEE מספר נקודה צפה tf.float64
enum: סוג ספירה tf.string באמצעות שם הסמל
צף: מספר נקודה צפה IEEE דיוק יחיד 32 סיביות tf.float32
int: מספר שלם בסימן 32 סיביות tf.int32
ארוך: מספר שלם בסימן 64 סיביות tf.int64
null: אין ערך משתמש בערך ברירת מחדל
מחרוזת: רצף תווים ביוניקוד tf.string

אוסף מקיף של דוגמאות של API הנתונים אברו מסופק בתוך המבחנים .