Avro Dataset API

ดูบน TensorFlow.org ทำงานใน Google Colab ดูแหล่งที่มาบน GitHub ดาวน์โหลดโน๊ตบุ๊ค

ภาพรวม

วัตถุประสงค์ของการรว์ชุดข้อมูล API คือการโหลดข้อมูลในรูปแบบรว์กำเนิดเข้า TensorFlow เป็น TensorFlow ชุด รว์เป็นระบบซีเรียลไลซ์เซชั่นข้อมูลที่คล้ายกับโปรโตคอลบัฟเฟอร์ มีการใช้กันอย่างแพร่หลายใน Apache Hadoop ซึ่งให้ทั้งรูปแบบการทำให้เป็นอนุกรมสำหรับข้อมูลถาวร และรูปแบบสายสำหรับการสื่อสารระหว่างโหนด Hadoop ข้อมูล Avro เป็นรูปแบบข้อมูลไบนารีแบบย่อแถวเชิงแถว มันอาศัยสคีมาซึ่งจัดเก็บเป็นไฟล์ JSON แยกต่างหาก สำหรับสเปคของรูปแบบรว์และการประกาศสคีมาโปรดดูที่ คู่มืออย่างเป็นทางการ

แพ็คเกจติดตั้ง

ติดตั้งแพ็คเกจ tensorflow-io ที่จำเป็น

pip install tensorflow-io

นำเข้าแพ็คเกจ

import tensorflow as tf
import tensorflow_io as tfio

ตรวจสอบการนำเข้า tf และ tfio

print("tensorflow-io version: {}".format(tfio.__version__))
print("tensorflow version: {}".format(tf.__version__))
tensorflow-io version: 0.18.0
tensorflow version: 2.5.0

การใช้งาน

สำรวจชุดข้อมูล

สำหรับจุดประสงค์ของบทช่วยสอนนี้ ให้ดาวน์โหลดชุดข้อมูลตัวอย่าง Avro

ดาวน์โหลดตัวอย่างไฟล์ Avro:

curl -OL https://github.com/tensorflow/io/raw/master/docs/tutorials/avro/train.avro
ls -l train.avro
% Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   151  100   151    0     0   1268      0 --:--:-- --:--:-- --:--:--  1268
100   369  100   369    0     0   1255      0 --:--:-- --:--:-- --:--:--  1255
-rw-rw-r-- 1 kbuilder kokoro 369 May 25 22:23 train.avro

ดาวน์โหลดไฟล์สคีมาที่เกี่ยวข้องของไฟล์ตัวอย่าง Avro:

curl -OL https://github.com/tensorflow/io/raw/master/docs/tutorials/avro/train.avsc
ls -l train.avsc
% Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   151  100   151    0     0   1247      0 --:--:-- --:--:-- --:--:--  1247
100   271  100   271    0     0    780      0 --:--:-- --:--:-- --:--:--   780
-rw-rw-r-- 1 kbuilder kokoro 271 May 25 22:23 train.avsc

ในตัวอย่างข้างต้น ชุดข้อมูลการทดสอบ Avro ถูกสร้างขึ้นตามชุดข้อมูล mnist ชุดข้อมูล mnist เดิมในรูปแบบ TFRecord ถูกสร้างขึ้นจาก ชุด TF ชื่อ อย่างไรก็ตาม ชุดข้อมูล mnist มีขนาดใหญ่เกินไปสำหรับชุดข้อมูลสาธิต เพื่อความเรียบง่าย ส่วนใหญ่จะถูกตัดแต่งและบันทึกเพียงไม่กี่รายการแรกเท่านั้น นอกจากนี้การตัดแต่งเพิ่มเติมได้ทำสำหรับ image สนามในชุดข้อมูลที่ mnist เดิมและแมปไปยัง features ในสนามรว์ ดังนั้นแฟ้มรว์ train.avro มี 4 ระเบียนแต่ละที่มี 3 สาขา: features ซึ่งเป็น array ของ int เป็น label เป็น int หรือโมฆะและ dataType การ enum เพื่อดูถอดรหัส train.avro (หมายเหตุ แฟ้มข้อมูลรว์เดิม ไม่ได้เป็นคนอ่านเป็นรว์เป็นรูปแบบกระชับ):

ติดตั้งแพ็คเกจที่จำเป็นเพื่ออ่านไฟล์ Avro:

pip install avro

หากต้องการอ่านและพิมพ์ไฟล์ Avro ในรูปแบบที่มนุษย์อ่านได้:

from avro.io import DatumReader
from avro.datafile import DataFileReader

import json

def print_avro(avro_file, max_record_num=None):
    if max_record_num is not None and max_record_num <= 0:
        return

    with open(avro_file, 'rb') as avro_handler:
        reader = DataFileReader(avro_handler, DatumReader())
        record_count = 0
        for record in reader:
            record_count = record_count+1
            print(record)
            if max_record_num is not None and record_count == max_record_num:
               break

print_avro(avro_file='train.avro')
{'features': [0, 0, 0, 1, 4], 'label': None, 'dataType': 'TRAINING'}
{'features': [0, 0], 'label': 2, 'dataType': 'TRAINING'}
{'features': [0], 'label': 3, 'dataType': 'VALIDATION'}
{'features': [1], 'label': 4, 'dataType': 'VALIDATION'}

และคีมาของ train.avro ซึ่งเป็นตัวแทนจาก train.avsc เป็นไฟล์รูปแบบ JSON เพื่อดู train.avsc :

def print_schema(avro_schema_file):
    with open(avro_schema_file, 'r') as handle:
        parsed = json.load(handle)
    print(json.dumps(parsed, indent=4, sort_keys=True))

print_schema('train.avsc')
{
    "fields": [
        {
            "name": "features",
            "type": {
                "items": "int",
                "type": "array"
            }
        },
        {
            "name": "label",
            "type": [
                "int",
                "null"
            ]
        },
        {
            "name": "dataType",
            "type": {
                "name": "dataTypes",
                "symbols": [
                    "TRAINING",
                    "VALIDATION"
                ],
                "type": "enum"
            }
        }
    ],
    "name": "ImageDataset",
    "type": "record"
}

เตรียมชุดข้อมูล

โหลด train.avro เป็นชุดข้อมูลที่ TensorFlow กับรว์ชุด API:

features = {
    'features[*]': tfio.experimental.columnar.VarLenFeatureWithRank(dtype=tf.int32),
    'label': tf.io.FixedLenFeature(shape=[], dtype=tf.int32, default_value=-100),
    'dataType': tf.io.FixedLenFeature(shape=[], dtype=tf.string)
}

schema = tf.io.gfile.GFile('train.avsc').read()

dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
                                                              reader_schema=schema,
                                                              features=features,
                                                              shuffle=False,
                                                              batch_size=3,
                                                              num_epochs=1)

for record in dataset:
    print(record['features[*]'])
    print(record['label'])
    print(record['dataType'])
    print("--------------------")
SparseTensor(indices=tf.Tensor(
[[0 0]
 [0 1]
 [0 2]
 [0 3]
 [0 4]
 [1 0]
 [1 1]
 [2 0]], shape=(8, 2), dtype=int64), values=tf.Tensor([0 0 0 1 4 0 0 0], shape=(8,), dtype=int32), dense_shape=tf.Tensor([3 5], shape=(2,), dtype=int64))
tf.Tensor([-100    2    3], shape=(3,), dtype=int32)
tf.Tensor([b'TRAINING' b'TRAINING' b'VALIDATION'], shape=(3,), dtype=string)
--------------------
SparseTensor(indices=tf.Tensor([[0 0]], shape=(1, 2), dtype=int64), values=tf.Tensor([1], shape=(1,), dtype=int32), dense_shape=tf.Tensor([1 1], shape=(2,), dtype=int64))
tf.Tensor([4], shape=(1,), dtype=int32)
tf.Tensor([b'VALIDATION'], shape=(1,), dtype=string)
--------------------

แปลงตัวอย่างข้างต้น train.avro เข้าไปในชุดข้อมูลที่ tensorflow แต่ละองค์ประกอบของชุดข้อมูลคือพจนานุกรมที่มีคีย์เป็นชื่อคุณลักษณะ ค่าคือเทนเซอร์เบาบางหรือหนาแน่นที่แปลงแล้ว เช่นจะแปลง features , label , dataType ฟิลด์ไป VarLenFeature (SparseTensor) FixedLenFeature (DenseTensor) และ FixedLenFeature (DenseTensor) ตามลำดับ ตั้งแต่ batch_size คือ 3 มันบีบบังคับ 3 ระเบียนจาก train.avro เข้าองค์ประกอบหนึ่งในชุดผล สำหรับการบันทึกครั้งแรกใน train.avro ที่มีฉลากเป็นโมฆะรว์แทนที่อ่านมันด้วยค่าเริ่มต้นที่ระบุ (-100) ในตัวอย่างนี้มีกำลัง 4 ระเบียนทั้งหมดใน train.avro ตั้งแต่ชุดขนาดคือ 3 ชุดผลมี 3 องค์ประกอบที่ผ่านมาซึ่งเป็นชุดขนาด 1 อย่างไรก็ตามผู้ใช้ยังสามารถที่จะวางชุดที่ผ่านมาหากมีขนาดที่เล็กกว่าขนาดชุดโดยการช่วยให้ drop_final_batch เช่น:

dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
                                                              reader_schema=schema,
                                                              features=features,
                                                              shuffle=False,
                                                              batch_size=3,
                                                              drop_final_batch=True,
                                                              num_epochs=1)

for record in dataset:
    print(record)
{'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f97656423d0>, 'dataType': <tf.Tensor: shape=(3,), dtype=string, numpy=array([b'TRAINING', b'TRAINING', b'VALIDATION'], dtype=object)>, 'label': <tf.Tensor: shape=(3,), dtype=int32, numpy=array([-100,    2,    3], dtype=int32)>}

นอกจากนี้ยังสามารถเพิ่ม num_parallel_reads เพื่อเร่งการประมวลผลข้อมูล Avro โดยเพิ่ม avro parse/read parallelism

dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
                                                              reader_schema=schema,
                                                              features=features,
                                                              shuffle=False,
                                                              num_parallel_reads=16,
                                                              batch_size=3,
                                                              drop_final_batch=True,
                                                              num_epochs=1)

for record in dataset:
    print(record)
{'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f9765693990>, 'dataType': <tf.Tensor: shape=(3,), dtype=string, numpy=array([b'TRAINING', b'TRAINING', b'VALIDATION'], dtype=object)>, 'label': <tf.Tensor: shape=(3,), dtype=int32, numpy=array([-100,    2,    3], dtype=int32)>}

สำหรับการใช้งานในรายละเอียดของ make_avro_record_dataset โปรดดูที่ API doc

ฝึกโมเดล tf.keras ด้วยชุดข้อมูล Avro

ตอนนี้ มาดูตัวอย่างแบบ end-to-end ของการฝึกอบรมโมเดล tf.keras ด้วยชุดข้อมูล Avro ตามชุดข้อมูล mnist

โหลด train.avro เป็นชุดข้อมูลที่ TensorFlow กับรว์ชุด API:

features = {
    'features[*]': tfio.experimental.columnar.VarLenFeatureWithRank(dtype=tf.int32)
}

schema = tf.io.gfile.GFile('train.avsc').read()

dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
                                                              reader_schema=schema,
                                                              features=features,
                                                              shuffle=False,
                                                              batch_size=1,
                                                              num_epochs=1)

กำหนดโมเดล keras อย่างง่าย:

def build_and_compile_cnn_model():
    model = tf.keras.Sequential()
    model.compile(optimizer='sgd', loss='mse')
    return model

model = build_and_compile_cnn_model()

ฝึกโมเดล keras ด้วยชุดข้อมูล Avro:

model.fit(x=dataset, epochs=1, steps_per_epoch=1, verbose=1)
WARNING:tensorflow:Layers in a Sequential model should only have a single input tensor, but we receive a <class 'dict'> input: {'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f94b00645d0>}
Consider rewriting this model with the Functional API.
WARNING:tensorflow:Layers in a Sequential model should only have a single input tensor, but we receive a <class 'dict'> input: {'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f976476ca90>}
Consider rewriting this model with the Functional API.
1/1 [==============================] - 0s 60ms/step - loss: 0.0000e+00
<tensorflow.python.keras.callbacks.History at 0x7f94ec08c6d0>

ชุดข้อมูล avro สามารถแยกวิเคราะห์และบีบบังคับข้อมูล avro ใด ๆ ลงในเทนเซอร์ TensorFlow รวมถึงบันทึกในบันทึก แผนที่ อาร์เรย์ สาขา และการแจงนับ ข้อมูลการแยกวิเคราะห์จะถูกส่งผ่านไปยังการนำชุดข้อมูล avro ไปใช้เป็นแผนที่โดยที่คีย์เข้ารหัสวิธีแยกวิเคราะห์ค่าข้อมูลจะเข้ารหัสวิธีบีบบังคับข้อมูลลงในเทนเซอร์ TensorFlow - ตัดสินใจเลือกประเภทดั้งเดิม (เช่น bool, int, long, float, double, string ) เช่นเดียวกับประเภทเทนเซอร์ (เช่น เบาบางหรือหนาแน่น) แสดงรายการประเภท parser ของ TensorFlow (ดูตารางที่ 1) และการบังคับประเภทดั้งเดิม (ตารางที่ 2)

ตารางที่ 1 ชนิดตัวแยกวิเคราะห์ TensorFlow ที่รองรับ:

ประเภทตัวแยกวิเคราะห์ TensorFlow เทนเซอร์โฟลว์ เทนเซอร์ คำอธิบาย
tf.FixedLenFeature([], tf.int32) เทนเซอร์หนาแน่น แยกวิเคราะห์คุณสมบัติความยาวคงที่ นั่นคือแถวทั้งหมดมีจำนวนองค์ประกอบคงที่เท่ากันเช่นองค์ประกอบเดียวหรืออาร์เรย์ที่มีจำนวนองค์ประกอบเท่ากันสำหรับแต่ละแถวเสมอ
tf.SparseFeature(index_key=['key_1st_index', 'key_2nd_index'], value_key='key_value', dtype=tf.int64, size=[20, 50]) เทนเซอร์เบาบาง แยกวิเคราะห์คุณลักษณะแบบกระจายซึ่งแต่ละแถวมีรายการดัชนีและค่าความยาวผันแปรได้ 'index_key' ระบุดัชนี 'value_key' ระบุค่า 'dtype' คือประเภทข้อมูล 'ขนาด' คือค่าดัชนีสูงสุดที่คาดหวังสำหรับแต่ละรายการดัชนี
tfio.experimental.columnar.VarLenFeatureWithRank([],tf.int64) เทนเซอร์เบาบาง แยกวิเคราะห์คุณสมบัติความยาวผันแปร หมายความว่าแต่ละแถวข้อมูลสามารถมีจำนวนตัวแปรได้ เช่น แถวที่ 1 มี 5 องค์ประกอบ แถวที่ 2 มี 7 องค์ประกอบ

ตารางที่ 2 การแปลงที่รองรับจากประเภท Avro เป็นประเภท TensorFlow:

Avro Primitive Type TensorFlow Primitive Type
บูลีน: ค่าไบนารี tf.bool
ไบต์: ลำดับของไบต์ที่ไม่ได้ลงนาม 8 บิต tf.string
สองเท่า: หมายเลขจุดลอยตัว IEEE 64 บิตความแม่นยำสองเท่า tf.float64
enum: ประเภทการแจงนับ tf.string โดยใช้ชื่อสัญลักษณ์
float: หมายเลขทศนิยม IEEE 32 บิตความแม่นยำเดียว tf.float32
int: จำนวนเต็มที่ลงนาม 32 บิต tf.int32
ยาว: จำนวนเต็มลงนาม 64 บิต tf.int64
null: ไม่มีค่า ใช้ค่าเริ่มต้น
สตริง: ลำดับอักขระยูนิโค้ด tf.string

ชุดที่ครอบคลุมของตัวอย่างของรว์ชุด API ที่มีให้ภายใน การทดสอบ