Avro Dataset API

عرض على TensorFlow.org تشغيل في Google Colab عرض المصدر على جيثب تحميل دفتر

ملخص

والهدف من أفرو الإدراجات API هو تحميل أفرو تنسيق البيانات أصلا في TensorFlow كما TensorFlow مجموعة البيانات . Avro هو نظام تسلسل بيانات مشابه لمخازن البروتوكول. يتم استخدامه على نطاق واسع في Apache Hadoop حيث يمكنه توفير تنسيق تسلسل للبيانات الثابتة وتنسيق سلكي للاتصال بين عقد Hadoop. بيانات Avro هي تنسيق بيانات ثنائي مضغوط وموجه نحو الصفوف. يعتمد على مخطط قاعدة البيانات الذي يتم تخزينه كملف JSON منفصل. للمواصفات الشكل ومخطط إعلان أفرو، يرجى الرجوع إلى دليل رسمي .

حزمة الإعداد

قم بتثبيت حزمة tensorflow-io المطلوبة

pip install tensorflow-io

حزم الاستيراد

import tensorflow as tf
import tensorflow_io as tfio

تحقق من صحة واردات tf و tfio

print("tensorflow-io version: {}".format(tfio.__version__))
print("tensorflow version: {}".format(tf.__version__))
tensorflow-io version: 0.18.0
tensorflow version: 2.5.0

إستعمال

استكشف مجموعة البيانات

لغرض هذا البرنامج التعليمي ، لنقم بتنزيل عينة مجموعة بيانات Avro.

تنزيل نموذج ملف Avro:

curl -OL https://github.com/tensorflow/io/raw/master/docs/tutorials/avro/train.avro
ls -l train.avro
% Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   151  100   151    0     0   1268      0 --:--:-- --:--:-- --:--:--  1268
100   369  100   369    0     0   1255      0 --:--:-- --:--:-- --:--:--  1255
-rw-rw-r-- 1 kbuilder kokoro 369 May 25 22:23 train.avro

قم بتنزيل ملف المخطط المقابل لنموذج ملف Avro:

curl -OL https://github.com/tensorflow/io/raw/master/docs/tutorials/avro/train.avsc
ls -l train.avsc
% Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   151  100   151    0     0   1247      0 --:--:-- --:--:-- --:--:--  1247
100   271  100   271    0     0    780      0 --:--:-- --:--:-- --:--:--   780
-rw-rw-r-- 1 kbuilder kokoro 271 May 25 22:23 train.avsc

في المثال أعلاه ، تم إنشاء مجموعة بيانات اختبار Avro بناءً على مجموعة بيانات mnist. يتم إنشاء مجموعة البيانات mnist الأصلي في شكل TFRecord من TF اسمه مجموعة البيانات . ومع ذلك ، فإن مجموعة البيانات mnist كبيرة جدًا مثل مجموعة البيانات التجريبية. لغرض التبسيط ، تم قطع معظمها وتم الاحتفاظ بالسجلات القليلة الأولى فقط. وعلاوة على ذلك، وقد تم تقليم إضافي لل image حقل في مجموعة البيانات mnist الأصلي وتعيين ل features ميدانية في أفرو. وبالتالي فإن ملف أفرو train.avro لديها 4 السجلات، ولكل منها 3 مجالات: features ، وهي مجموعة من كثافة العمليات، label ، وهو صحيح أو باطل، و dataType ، وهو التعداد. لعرض فك train.avro (لاحظ ملف البيانات أفرو الأصلي ليس الإنسان يمكن قراءتها كما أفرو هو شكل مضغوط):

قم بتثبيت الحزمة المطلوبة لقراءة ملف Avro:

pip install avro

لقراءة وطباعة ملف Avro بتنسيق يمكن للبشر قراءته:

from avro.io import DatumReader
from avro.datafile import DataFileReader

import json

def print_avro(avro_file, max_record_num=None):
    if max_record_num is not None and max_record_num <= 0:
        return

    with open(avro_file, 'rb') as avro_handler:
        reader = DataFileReader(avro_handler, DatumReader())
        record_count = 0
        for record in reader:
            record_count = record_count+1
            print(record)
            if max_record_num is not None and record_count == max_record_num:
               break

print_avro(avro_file='train.avro')
{'features': [0, 0, 0, 1, 4], 'label': None, 'dataType': 'TRAINING'}
{'features': [0, 0], 'label': 2, 'dataType': 'TRAINING'}
{'features': [0], 'label': 3, 'dataType': 'VALIDATION'}
{'features': [1], 'label': 4, 'dataType': 'VALIDATION'}

ومخطط train.avro الذي يمثله train.avsc هو ملف بتنسيق JSON. لعرض train.avsc :

def print_schema(avro_schema_file):
    with open(avro_schema_file, 'r') as handle:
        parsed = json.load(handle)
    print(json.dumps(parsed, indent=4, sort_keys=True))

print_schema('train.avsc')
{
    "fields": [
        {
            "name": "features",
            "type": {
                "items": "int",
                "type": "array"
            }
        },
        {
            "name": "label",
            "type": [
                "int",
                "null"
            ]
        },
        {
            "name": "dataType",
            "type": {
                "name": "dataTypes",
                "symbols": [
                    "TRAINING",
                    "VALIDATION"
                ],
                "type": "enum"
            }
        }
    ],
    "name": "ImageDataset",
    "type": "record"
}

جهز مجموعة البيانات

تحميل train.avro كما بيانات TensorFlow مع API بيانات أفرو:

features = {
    'features[*]': tfio.experimental.columnar.VarLenFeatureWithRank(dtype=tf.int32),
    'label': tf.io.FixedLenFeature(shape=[], dtype=tf.int32, default_value=-100),
    'dataType': tf.io.FixedLenFeature(shape=[], dtype=tf.string)
}

schema = tf.io.gfile.GFile('train.avsc').read()

dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
                                                              reader_schema=schema,
                                                              features=features,
                                                              shuffle=False,
                                                              batch_size=3,
                                                              num_epochs=1)

for record in dataset:
    print(record['features[*]'])
    print(record['label'])
    print(record['dataType'])
    print("--------------------")
SparseTensor(indices=tf.Tensor(
[[0 0]
 [0 1]
 [0 2]
 [0 3]
 [0 4]
 [1 0]
 [1 1]
 [2 0]], shape=(8, 2), dtype=int64), values=tf.Tensor([0 0 0 1 4 0 0 0], shape=(8,), dtype=int32), dense_shape=tf.Tensor([3 5], shape=(2,), dtype=int64))
tf.Tensor([-100    2    3], shape=(3,), dtype=int32)
tf.Tensor([b'TRAINING' b'TRAINING' b'VALIDATION'], shape=(3,), dtype=string)
--------------------
SparseTensor(indices=tf.Tensor([[0 0]], shape=(1, 2), dtype=int64), values=tf.Tensor([1], shape=(1,), dtype=int32), dense_shape=tf.Tensor([1 1], shape=(2,), dtype=int64))
tf.Tensor([4], shape=(1,), dtype=int32)
tf.Tensor([b'VALIDATION'], shape=(1,), dtype=string)
--------------------

على سبيل المثال المتحولين أعلاه train.avro إلى مجموعة البيانات tensorflow. كل عنصر من عناصر مجموعة البيانات عبارة عن قاموس يكون مفتاحه هو اسم الميزة ، والقيمة هي المحول المتناثر أو الكثيف. على سبيل المثال، فإنه يحول features ، label ، dataType الحقل إلى VarLenFeature (SparseTensor)، FixedLenFeature (DenseTensor)، وFixedLenFeature (DenseTensor) على التوالي. منذ batch_size هو 3، وإجبار 3 السجلات من train.avro إلى عنصر واحد في مجموعة البيانات نتيجة. للسجل الأول في train.avro الذي التسمية هو باطل، أفرو القارئ يستبدلها مع القيمة الافتراضية معينة (-100). في هذا المثال، كنت هناك 4 السجلات في المجموع في train.avro . منذ حجم الدفعي 3، ومجموعة البيانات نتيجة يحتوي على 3 عناصر، آخرها في حجم الدفعي 1. ومع ذلك المستخدم هي أيضا قادرة على إسقاط الدفعة الأخيرة إذا كان حجم أصغر من حجم الدفعة من خلال تمكين drop_final_batch . على سبيل المثال:

dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
                                                              reader_schema=schema,
                                                              features=features,
                                                              shuffle=False,
                                                              batch_size=3,
                                                              drop_final_batch=True,
                                                              num_epochs=1)

for record in dataset:
    print(record)
{'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f97656423d0>, 'dataType': <tf.Tensor: shape=(3,), dtype=string, numpy=array([b'TRAINING', b'TRAINING', b'VALIDATION'], dtype=object)>, 'label': <tf.Tensor: shape=(3,), dtype=int32, numpy=array([-100,    2,    3], dtype=int32)>}

يمكن للمرء أيضًا زيادة num_parallel_reads لتسريع معالجة بيانات Avro عن طريق زيادة تحليل / قراءة التوازي.

dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
                                                              reader_schema=schema,
                                                              features=features,
                                                              shuffle=False,
                                                              num_parallel_reads=16,
                                                              batch_size=3,
                                                              drop_final_batch=True,
                                                              num_epochs=1)

for record in dataset:
    print(record)
{'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f9765693990>, 'dataType': <tf.Tensor: shape=(3,), dtype=string, numpy=array([b'TRAINING', b'TRAINING', b'VALIDATION'], dtype=object)>, 'label': <tf.Tensor: shape=(3,), dtype=int32, numpy=array([-100,    2,    3], dtype=int32)>}

للاستخدام مفصل ل make_avro_record_dataset ، يرجى الرجوع إلى وثيقة API .

تدريب نماذج tf.keras باستخدام مجموعة بيانات Avro

الآن دعنا نتعرف على مثال شامل للتدريب على نموذج tf.keras باستخدام مجموعة بيانات Avro بناءً على مجموعة بيانات mnist.

تحميل train.avro كما بيانات TensorFlow مع API بيانات أفرو:

features = {
    'features[*]': tfio.experimental.columnar.VarLenFeatureWithRank(dtype=tf.int32)
}

schema = tf.io.gfile.GFile('train.avsc').read()

dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
                                                              reader_schema=schema,
                                                              features=features,
                                                              shuffle=False,
                                                              batch_size=1,
                                                              num_epochs=1)

حدد نموذج keras بسيطًا:

def build_and_compile_cnn_model():
    model = tf.keras.Sequential()
    model.compile(optimizer='sgd', loss='mse')
    return model

model = build_and_compile_cnn_model()

تدريب نموذج keras باستخدام مجموعة بيانات Avro:

model.fit(x=dataset, epochs=1, steps_per_epoch=1, verbose=1)
WARNING:tensorflow:Layers in a Sequential model should only have a single input tensor, but we receive a <class 'dict'> input: {'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f94b00645d0>}
Consider rewriting this model with the Functional API.
WARNING:tensorflow:Layers in a Sequential model should only have a single input tensor, but we receive a <class 'dict'> input: {'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f976476ca90>}
Consider rewriting this model with the Functional API.
1/1 [==============================] - 0s 60ms/step - loss: 0.0000e+00
<tensorflow.python.keras.callbacks.History at 0x7f94ec08c6d0>

يمكن لمجموعة بيانات avro تحليل وإجبار أي بيانات avro في موتر TensorFlow ، بما في ذلك السجلات في السجلات والخرائط والمصفوفات والفروع والتعدادات. يتم تمرير معلومات التحليل إلى تنفيذ مجموعة بيانات avro كخريطة حيث تقوم المفاتيح بتشفير كيفية تحليل قيم البيانات المشفرة حول كيفية إجبار البيانات على موترات TensorFlow - تحديد النوع الأولي (على سبيل المثال ، bool ، int ، long ، float ، double ، string ) وكذلك نوع الموتر (مثل متفرق أو كثيف). يتم توفير قائمة بأنواع محلل TensorFlow (انظر الجدول 1) وإكراه الأنواع البدائية (الجدول 2).

الجدول 1 أنواع المحلل اللغوي TensorFlow المدعومة:

أنواع المحلل اللغوي TensorFlow موتر TensorFlow تفسير
tf.FixedLenFeature ([]، tf.int32) موتر كثيف تحليل ميزة طول ثابت ؛ أي أن جميع الصفوف لها نفس العدد الثابت للعناصر ، على سبيل المثال عنصر واحد فقط أو مصفوفة تحتوي دائمًا على نفس عدد العناصر لكل صف
tf.SparseFeature (index_key = ['key_1st_index'، 'key_2nd_index']، value_key = 'key_value'، dtype = tf.int64، size = [20، 50]) موتر متفرق تحليل ميزة متفرقة حيث يحتوي كل صف على قائمة متغيرة الطول من المؤشرات والقيم. يحدد "index_key" المؤشرات. يحدد "value_key" القيمة. "dtype" هو نوع البيانات. "الحجم" هو الحد الأقصى لقيمة المؤشر المتوقعة لكل مدخل فهرس
tfio.experimental.columnar.VarLenFeatureWithRank ([]، tf.int64) موتر متفرق تحليل سمة متغيرة الطول ؛ هذا يعني أن كل صف بيانات يمكن أن يحتوي على عدد متغير من العناصر ، على سبيل المثال ، الصف الأول يحتوي على 5 عناصر ، والصف الثاني يحتوي على 7 عناصر

الجدول 2 - التحويل المدعوم من أنواع Avro إلى أنواع TensorFlow:

النوع البدائي Avro نوع TensorFlow البدائي
منطقي: قيمة ثنائية tf.bool
بايت: تسلسل من 8 بت بايت بدون إشارة tf.string
مزدوج: رقم الفاصلة العائمة IEEE ذو الدقة المزدوجة 64 بت tf.float64
التعداد: نوع العد tf.string باستخدام اسم الرمز
تعويم: رقم الفاصلة العائمة IEEE بدقة 32 بت tf.float32
int: 32 بت توقيع عدد صحيح tf.int32
طويل: 64 بت عدد صحيح موقّع tf.int64
خالية: لا قيمة يستخدم القيمة الافتراضية
السلسلة: تسلسل أحرف unicode tf.string

وتقدم مجموعة شاملة من الأمثلة على API بيانات أفرو في الاختبارات .