에이브로 데이터셋 API

TensorFlow.org에서 보기 Google Colab에서 실행 GitHub에서 소스 보기 노트북 다운로드

개요

아 브로 데이터 집합 API의 목적은 브로과 같이 TensorFlow에 기본적으로 형식의 데이터를로드하는 것입니다 TensorFlow 데이터 세트 . Avro는 프로토콜 버퍼와 유사한 데이터 직렬화 시스템입니다. 영구 데이터를 위한 직렬화 형식과 Hadoop 노드 간의 통신을 위한 유선 형식을 모두 제공할 수 있는 Apache Hadoop에서 널리 사용됩니다. Avro 데이터는 행 지향의 압축된 이진 데이터 형식입니다. 별도의 JSON 파일로 저장된 스키마에 의존합니다. 아 브로 형식 및 스키마 선언의 명세를 들어, 참조하시기 바랍니다 공식 매뉴얼 .

설치 패키지

필요한 tensorflow-io 패키지 설치

pip install tensorflow-io

패키지 가져오기

import tensorflow as tf
import tensorflow_io as tfio

tf 및 tfio 가져오기 확인

print("tensorflow-io version: {}".format(tfio.__version__))
print("tensorflow version: {}".format(tf.__version__))
tensorflow-io version: 0.18.0
tensorflow version: 2.5.0

용법

데이터세트 탐색

이 자습서의 목적을 위해 샘플 Avro 데이터 세트를 다운로드하겠습니다.

샘플 Avro 파일 다운로드:

curl -OL https://github.com/tensorflow/io/raw/master/docs/tutorials/avro/train.avro
ls -l train.avro
% Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   151  100   151    0     0   1268      0 --:--:-- --:--:-- --:--:--  1268
100   369  100   369    0     0   1255      0 --:--:-- --:--:-- --:--:--  1255
-rw-rw-r-- 1 kbuilder kokoro 369 May 25 22:23 train.avro

샘플 Avro 파일의 해당 스키마 파일을 다운로드합니다.

curl -OL https://github.com/tensorflow/io/raw/master/docs/tutorials/avro/train.avsc
ls -l train.avsc
% Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   151  100   151    0     0   1247      0 --:--:-- --:--:-- --:--:--  1247
100   271  100   271    0     0    780      0 --:--:-- --:--:-- --:--:--   780
-rw-rw-r-- 1 kbuilder kokoro 271 May 25 22:23 train.avsc

위의 예에서는 mnist 데이터셋을 기반으로 테스트용 Avro 데이터셋을 생성했습니다. TFRecord 형식의 원래 mnist 데이터 세트가 생성됩니다 TF라는 이름의 데이터 세트 . 그러나 mnist 데이터셋은 데모 데이터셋으로 너무 큽니다. 단순함을 위해 대부분을 잘라내고 처음 몇 개의 기록만 보관했습니다. 또한, 추가 트리밍에 대해 수행 된 image 원래 mnist 데이터 집합의 필드에 매핑 features 브로에서 필드. 브로 파일 그래서 train.avro 3 개 필드 각각 갖는 4 개 기록 보유 features , INT의 배열 label , int로 또는 널 및 dataType , 열거한다. 디코딩 보려면 train.avro (주 일본어 브로 데이터 파일은 브로는 압축 형식으로 사람이 읽을 수 없다)

Avro 파일을 읽는 데 필요한 패키지를 설치합니다.

pip install avro

사람이 읽을 수 있는 형식으로 Avro 파일을 읽고 인쇄하려면:

from avro.io import DatumReader
from avro.datafile import DataFileReader

import json

def print_avro(avro_file, max_record_num=None):
    if max_record_num is not None and max_record_num <= 0:
        return

    with open(avro_file, 'rb') as avro_handler:
        reader = DataFileReader(avro_handler, DatumReader())
        record_count = 0
        for record in reader:
            record_count = record_count+1
            print(record)
            if max_record_num is not None and record_count == max_record_num:
               break

print_avro(avro_file='train.avro')
{'features': [0, 0, 0, 1, 4], 'label': None, 'dataType': 'TRAINING'}
{'features': [0, 0], 'label': 2, 'dataType': 'TRAINING'}
{'features': [0], 'label': 3, 'dataType': 'VALIDATION'}
{'features': [1], 'label': 4, 'dataType': 'VALIDATION'}

그리고의 스키마 train.avro 로 표현된다 train.avsc JSON 형식의 파일입니다. 보기에 train.avsc :

def print_schema(avro_schema_file):
    with open(avro_schema_file, 'r') as handle:
        parsed = json.load(handle)
    print(json.dumps(parsed, indent=4, sort_keys=True))

print_schema('train.avsc')
{
    "fields": [
        {
            "name": "features",
            "type": {
                "items": "int",
                "type": "array"
            }
        },
        {
            "name": "label",
            "type": [
                "int",
                "null"
            ]
        },
        {
            "name": "dataType",
            "type": {
                "name": "dataTypes",
                "symbols": [
                    "TRAINING",
                    "VALIDATION"
                ],
                "type": "enum"
            }
        }
    ],
    "name": "ImageDataset",
    "type": "record"
}

데이터세트 준비

로드 train.avro 브로 데이터 세트의 API와 TensorFlow 데이터 세트로 :

features = {
    'features[*]': tfio.experimental.columnar.VarLenFeatureWithRank(dtype=tf.int32),
    'label': tf.io.FixedLenFeature(shape=[], dtype=tf.int32, default_value=-100),
    'dataType': tf.io.FixedLenFeature(shape=[], dtype=tf.string)
}

schema = tf.io.gfile.GFile('train.avsc').read()

dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
                                                              reader_schema=schema,
                                                              features=features,
                                                              shuffle=False,
                                                              batch_size=3,
                                                              num_epochs=1)

for record in dataset:
    print(record['features[*]'])
    print(record['label'])
    print(record['dataType'])
    print("--------------------")
SparseTensor(indices=tf.Tensor(
[[0 0]
 [0 1]
 [0 2]
 [0 3]
 [0 4]
 [1 0]
 [1 1]
 [2 0]], shape=(8, 2), dtype=int64), values=tf.Tensor([0 0 0 1 4 0 0 0], shape=(8,), dtype=int32), dense_shape=tf.Tensor([3 5], shape=(2,), dtype=int64))
tf.Tensor([-100    2    3], shape=(3,), dtype=int32)
tf.Tensor([b'TRAINING' b'TRAINING' b'VALIDATION'], shape=(3,), dtype=string)
--------------------
SparseTensor(indices=tf.Tensor([[0 0]], shape=(1, 2), dtype=int64), values=tf.Tensor([1], shape=(1,), dtype=int32), dense_shape=tf.Tensor([1 1], shape=(2,), dtype=int64))
tf.Tensor([4], shape=(1,), dtype=int32)
tf.Tensor([b'VALIDATION'], shape=(1,), dtype=string)
--------------------

상기 예시적인 변환은 train.avro tensorflow 세트로. 데이터 세트의 각 요소는 키가 기능 이름이고 값이 변환된 희소 또는 밀집 텐서인 사전입니다. 예, 그것은 변환 features , label , dataType 각각 VarLenFeature (SparseTensor), FixedLenFeature (DenseTensor), 및 FixedLenFeature (DenseTensor)에 필드. BATCH_SIZE가 3이기 때문에, 3 개 기록 강제 train.avro 결과 데이터 세트에서 하나 개의 원소로. 첫 번째 레코드를 들어 train.avro 그 라벨 null의 경우, 지정된 디폴트 값 (-100)와 브로 리더 대체합니다을. 이 예에서 총 4 개 기록이있어 train.avro . 배치 크기가 3이기 때문에, 결과 세트는 3 개 요소, 사용자는 크기가 활성화하여 배치 크기보다 작다면 최종 배치를 드롭 할 수있다 그러나 마지막의 일괄 크기가 1 인 포함 drop_final_batch . 예:

dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
                                                              reader_schema=schema,
                                                              features=features,
                                                              shuffle=False,
                                                              batch_size=3,
                                                              drop_final_batch=True,
                                                              num_epochs=1)

for record in dataset:
    print(record)
{'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f97656423d0>, 'dataType': <tf.Tensor: shape=(3,), dtype=string, numpy=array([b'TRAINING', b'TRAINING', b'VALIDATION'], dtype=object)>, 'label': <tf.Tensor: shape=(3,), dtype=int32, numpy=array([-100,    2,    3], dtype=int32)>}

또한 num_parallel_reads를 늘려 avro 구문 분석/읽기 병렬 처리를 높여 Avro 데이터 처리를 촉진할 수 있습니다.

dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
                                                              reader_schema=schema,
                                                              features=features,
                                                              shuffle=False,
                                                              num_parallel_reads=16,
                                                              batch_size=3,
                                                              drop_final_batch=True,
                                                              num_epochs=1)

for record in dataset:
    print(record)
{'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f9765693990>, 'dataType': <tf.Tensor: shape=(3,), dtype=string, numpy=array([b'TRAINING', b'TRAINING', b'VALIDATION'], dtype=object)>, 'label': <tf.Tensor: shape=(3,), dtype=int32, numpy=array([-100,    2,    3], dtype=int32)>}

자세한 사용에 대한 make_avro_record_dataset , 참조하시기 바랍니다 API의 문서 .

Avro 데이터 세트로 tf.keras 모델 학습

이제 mnist 데이터 세트를 기반으로 하는 Avro 데이터 세트를 사용하여 tf.keras 모델 교육의 종단 간 예제를 살펴보겠습니다.

로드 train.avro 브로 데이터 세트의 API와 TensorFlow 데이터 세트로 :

features = {
    'features[*]': tfio.experimental.columnar.VarLenFeatureWithRank(dtype=tf.int32)
}

schema = tf.io.gfile.GFile('train.avsc').read()

dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
                                                              reader_schema=schema,
                                                              features=features,
                                                              shuffle=False,
                                                              batch_size=1,
                                                              num_epochs=1)

간단한 케라스 모델 정의:

def build_and_compile_cnn_model():
    model = tf.keras.Sequential()
    model.compile(optimizer='sgd', loss='mse')
    return model

model = build_and_compile_cnn_model()

Avro 데이터 세트를 사용하여 keras 모델 학습:

model.fit(x=dataset, epochs=1, steps_per_epoch=1, verbose=1)
WARNING:tensorflow:Layers in a Sequential model should only have a single input tensor, but we receive a <class 'dict'> input: {'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f94b00645d0>}
Consider rewriting this model with the Functional API.
WARNING:tensorflow:Layers in a Sequential model should only have a single input tensor, but we receive a <class 'dict'> input: {'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f976476ca90>}
Consider rewriting this model with the Functional API.
1/1 [==============================] - 0s 60ms/step - loss: 0.0000e+00
<tensorflow.python.keras.callbacks.History at 0x7f94ec08c6d0>

avro 데이터 세트는 레코드, 맵, 배열, 분기 및 열거의 레코드를 포함하여 모든 avro 데이터를 구문 분석하고 TensorFlow 텐서로 강제 변환할 수 있습니다. 구문 분석 정보는 키가 데이터 값을 구문 분석하는 방법을 인코딩하는 맵으로 데이터를 TensorFlow 텐서로 강제 변환하는 방법을 인코딩하는 맵으로 전달됩니다. 기본 유형(예: bool, int, long, float, double, string)을 결정합니다. ) 뿐만 아니라 텐서 유형(예: 희소 또는 밀집). TensorFlow의 파서 유형 목록(표 1 참조)과 기본 유형의 강제 변환(표 2)이 제공됩니다.

표 1 지원되는 TensorFlow 파서 유형:

TensorFlow 파서 유형 텐서플로우 텐서 설명
tf.FixedLenFeature([], tf.int32) 조밀한 텐서 고정 길이 기능을 구문 분석합니다. 즉, 모든 행은 동일한 상수 수의 요소를 갖습니다. 예를 들어 하나의 요소 또는 각 행에 대해 항상 동일한 수의 요소를 갖는 배열
tf.SparseFeature(index_key=['key_1st_index', 'key_2nd_index'], value_key='key_value', dtype=tf.int64, 크기=[20, 50]) 희소 텐서 각 행에 인덱스 및 값의 가변 길이 목록이 있는 희소 특성을 구문 분석합니다. 'index_key'는 인덱스를 식별합니다. 'value_key'는 값을 식별합니다. 'dtype'은 데이터 유형입니다. '크기'는 각 인덱스 항목에 대해 예상되는 최대 인덱스 값입니다.
tfio.experimental.columnar.VarLenFeatureWithRank([],tf.int64) 희소 텐서 가변 길이 기능을 구문 분석합니다. 즉, 각 데이터 행은 다양한 수의 요소를 가질 수 있습니다. 예를 들어 첫 번째 행에는 5개의 요소가 있고 두 번째 행에는 7개의 요소가 있습니다.

표 2 Avro 유형에서 TensorFlow 유형으로 지원되는 변환:

Avro 원시 유형 TensorFlow 기본 유형
부울: 이진 값 tf.bool
바이트열: 8비트 무부호 바이트 시퀀스 tf.string
double: 배정밀도 64비트 IEEE 부동 소수점 숫자 tf.float64
열거형: 열거형 기호 이름을 사용하는 tf.string
float: 단정밀도 32비트 IEEE 부동 소수점 숫자 tf.float32
int: 32비트 부호 있는 정수 tf.int32
long: 64비트 부호 있는 정수 tf.int64
null: 값 없음 기본값 사용
string: 유니코드 문자 시퀀스 tf.string

아 브로 데이터 세트의 API의 예의 포괄적 인 세트 내에서 제공되는 테스트 .