Avro Dataset API

Посмотреть на TensorFlow.org Запустить в Google Colab Посмотреть исходный код на GitHub Скачать блокнот

Обзор

Цель Avro Dataset API является загрузка данных в формате Avro изначально в качестве TensorFlow TensorFlow набора данных . Avro — это система сериализации данных, похожая на Protocol Buffers. Он широко используется в Apache Hadoop, где может предоставлять как формат сериализации для постоянных данных, так и проводной формат для связи между узлами Hadoop. Данные Avro представляют собой сжатый формат двоичных данных, ориентированный на строки. Он основан на схеме, которая хранится в виде отдельного файла JSON. Для спецификации формата и схемы Avro декларации, пожалуйста , обратитесь к официальному руководству .

Пакет установки

Установите необходимый пакет tensorflow-io

pip install tensorflow-io

Импорт пакетов

import tensorflow as tf
import tensorflow_io as tfio

Проверка импорта tf и tfio

print("tensorflow-io version: {}".format(tfio.__version__))
print("tensorflow version: {}".format(tf.__version__))
tensorflow-io version: 0.18.0
tensorflow version: 2.5.0

использование

Исследуйте набор данных

Для целей этого руководства давайте загрузим образец набора данных Avro.

Загрузите образец файла Avro:

curl -OL https://github.com/tensorflow/io/raw/master/docs/tutorials/avro/train.avro
ls -l train.avro
% Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   151  100   151    0     0   1268      0 --:--:-- --:--:-- --:--:--  1268
100   369  100   369    0     0   1255      0 --:--:-- --:--:-- --:--:--  1255
-rw-rw-r-- 1 kbuilder kokoro 369 May 25 22:23 train.avro

Загрузите соответствующий файл схемы образца файла Avro:

curl -OL https://github.com/tensorflow/io/raw/master/docs/tutorials/avro/train.avsc
ls -l train.avsc
% Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   151  100   151    0     0   1247      0 --:--:-- --:--:-- --:--:--  1247
100   271  100   271    0     0    780      0 --:--:-- --:--:-- --:--:--   780
-rw-rw-r-- 1 kbuilder kokoro 271 May 25 22:23 train.avsc

В приведенном выше примере тестовый набор данных Avro был создан на основе набора данных mnist. Оригинальный mnist набор данных в формате TFRecord генерируется из ТФ имени набора данных . Однако набор данных mnist слишком велик для демонстрационного набора данных. Для простоты большая его часть была обрезана, и были сохранены только первые несколько записей. Кроме того, дополнительная подгонка была сделана для image поля в оригинальном mnist набор данных и отображается его features поля в Avro. Так Avro файл train.avro имеет 4 записей, каждая из которых имеет 3 поля: features , которая является массивом междунар, label , в междунар или нуль, и dataType , перечисление. Для просмотра декодированного train.avro (Примечания исходного файла данных AVRO не читаемый человека , как Avro является уплотненным форматом):

Установите необходимый пакет для чтения файла Avro:

pip install avro

Чтобы прочитать и распечатать файл Avro в удобочитаемом формате:

from avro.io import DatumReader
from avro.datafile import DataFileReader

import json

def print_avro(avro_file, max_record_num=None):
    if max_record_num is not None and max_record_num <= 0:
        return

    with open(avro_file, 'rb') as avro_handler:
        reader = DataFileReader(avro_handler, DatumReader())
        record_count = 0
        for record in reader:
            record_count = record_count+1
            print(record)
            if max_record_num is not None and record_count == max_record_num:
               break

print_avro(avro_file='train.avro')
{'features': [0, 0, 0, 1, 4], 'label': None, 'dataType': 'TRAINING'}
{'features': [0, 0], 'label': 2, 'dataType': 'TRAINING'}
{'features': [0], 'label': 3, 'dataType': 'VALIDATION'}
{'features': [1], 'label': 4, 'dataType': 'VALIDATION'}

И схема train.avro которая представлена train.avsc является JSON-файл в формате. Для просмотра train.avsc :

def print_schema(avro_schema_file):
    with open(avro_schema_file, 'r') as handle:
        parsed = json.load(handle)
    print(json.dumps(parsed, indent=4, sort_keys=True))

print_schema('train.avsc')
{
    "fields": [
        {
            "name": "features",
            "type": {
                "items": "int",
                "type": "array"
            }
        },
        {
            "name": "label",
            "type": [
                "int",
                "null"
            ]
        },
        {
            "name": "dataType",
            "type": {
                "name": "dataTypes",
                "symbols": [
                    "TRAINING",
                    "VALIDATION"
                ],
                "type": "enum"
            }
        }
    ],
    "name": "ImageDataset",
    "type": "record"
}

Подготовьте набор данных

Нагрузка train.avro в TensorFlow набора данных Avro набора данных API:

features = {
    'features[*]': tfio.experimental.columnar.VarLenFeatureWithRank(dtype=tf.int32),
    'label': tf.io.FixedLenFeature(shape=[], dtype=tf.int32, default_value=-100),
    'dataType': tf.io.FixedLenFeature(shape=[], dtype=tf.string)
}

schema = tf.io.gfile.GFile('train.avsc').read()

dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
                                                              reader_schema=schema,
                                                              features=features,
                                                              shuffle=False,
                                                              batch_size=3,
                                                              num_epochs=1)

for record in dataset:
    print(record['features[*]'])
    print(record['label'])
    print(record['dataType'])
    print("--------------------")
SparseTensor(indices=tf.Tensor(
[[0 0]
 [0 1]
 [0 2]
 [0 3]
 [0 4]
 [1 0]
 [1 1]
 [2 0]], shape=(8, 2), dtype=int64), values=tf.Tensor([0 0 0 1 4 0 0 0], shape=(8,), dtype=int32), dense_shape=tf.Tensor([3 5], shape=(2,), dtype=int64))
tf.Tensor([-100    2    3], shape=(3,), dtype=int32)
tf.Tensor([b'TRAINING' b'TRAINING' b'VALIDATION'], shape=(3,), dtype=string)
--------------------
SparseTensor(indices=tf.Tensor([[0 0]], shape=(1, 2), dtype=int64), values=tf.Tensor([1], shape=(1,), dtype=int32), dense_shape=tf.Tensor([1 1], shape=(2,), dtype=int64))
tf.Tensor([4], shape=(1,), dtype=int32)
tf.Tensor([b'VALIDATION'], shape=(1,), dtype=string)
--------------------

В приведенном выше примере преобразует train.avro в tensorflow набора данных. Каждый элемент набора данных представляет собой словарь, ключом которого является имя объекта, значением является преобразованный разреженный или плотный тензор. Например, он преобразует features , label , dataType поля к VarLenFeature (SparseTensor), FixedLenFeature (DenseTensor) и FixedLenFeature (DenseTensor) соответственно. Так как batch_size равно 3, то принуждать 3 записи из train.avro в один элемент в результирующий набор данных. Для первой записи в train.avro , метка которого равна нулю, Avro считыватель заменяет его с заданным значением по умолчанию (-100). В этом примере вы 4 записей в общей сложности в train.avro . Так как размер пакета составляет 3, в результате набор данных содержит 3 элемента, последний из которых - х размер пакета равен 1. Однако пользователь также может отказаться от последней партии , если размер меньше , чем размер партии, позволяя drop_final_batch . Например:

dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
                                                              reader_schema=schema,
                                                              features=features,
                                                              shuffle=False,
                                                              batch_size=3,
                                                              drop_final_batch=True,
                                                              num_epochs=1)

for record in dataset:
    print(record)
{'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f97656423d0>, 'dataType': <tf.Tensor: shape=(3,), dtype=string, numpy=array([b'TRAINING', b'TRAINING', b'VALIDATION'], dtype=object)>, 'label': <tf.Tensor: shape=(3,), dtype=int32, numpy=array([-100,    2,    3], dtype=int32)>}

Можно также увеличить num_parallel_reads, чтобы ускорить обработку данных Avro за счет увеличения параллелизма разбора/чтения avro.

dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
                                                              reader_schema=schema,
                                                              features=features,
                                                              shuffle=False,
                                                              num_parallel_reads=16,
                                                              batch_size=3,
                                                              drop_final_batch=True,
                                                              num_epochs=1)

for record in dataset:
    print(record)
{'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f9765693990>, 'dataType': <tf.Tensor: shape=(3,), dtype=string, numpy=array([b'TRAINING', b'TRAINING', b'VALIDATION'], dtype=object)>, 'label': <tf.Tensor: shape=(3,), dtype=int32, numpy=array([-100,    2,    3], dtype=int32)>}

Для детального использования make_avro_record_dataset , пожалуйста , обратитесь к API док .

Обучите модели tf.keras с набором данных Avro

Теперь давайте рассмотрим сквозной пример обучения модели tf.keras с набором данных Avro на основе набора данных mnist.

Нагрузка train.avro в TensorFlow набора данных Avro набора данных API:

features = {
    'features[*]': tfio.experimental.columnar.VarLenFeatureWithRank(dtype=tf.int32)
}

schema = tf.io.gfile.GFile('train.avsc').read()

dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
                                                              reader_schema=schema,
                                                              features=features,
                                                              shuffle=False,
                                                              batch_size=1,
                                                              num_epochs=1)

Определите простую модель keras:

def build_and_compile_cnn_model():
    model = tf.keras.Sequential()
    model.compile(optimizer='sgd', loss='mse')
    return model

model = build_and_compile_cnn_model()

Обучите модель keras с набором данных Avro:

model.fit(x=dataset, epochs=1, steps_per_epoch=1, verbose=1)
WARNING:tensorflow:Layers in a Sequential model should only have a single input tensor, but we receive a <class 'dict'> input: {'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f94b00645d0>}
Consider rewriting this model with the Functional API.
WARNING:tensorflow:Layers in a Sequential model should only have a single input tensor, but we receive a <class 'dict'> input: {'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f976476ca90>}
Consider rewriting this model with the Functional API.
1/1 [==============================] - 0s 60ms/step - loss: 0.0000e+00
<tensorflow.python.keras.callbacks.History at 0x7f94ec08c6d0>

Набор данных avro может анализировать и преобразовывать любые данные avro в тензоры TensorFlow, включая записи в записях, карты, массивы, ветви и перечисления. Информация об анализе передается в реализацию набора данных avro в виде карты, где ключи кодируют, как анализировать значения данных, кодируют то, как преобразовать данные в тензоры TensorFlow — определяя тип примитива (например, bool, int, long, float, double, string ), а также тип тензора (например, разреженный или плотный). Предоставляется список типов парсеров TensorFlow (см. Таблицу 1) и приведение примитивных типов (Таблица 2).

Таблица 1. Поддерживаемые типы парсеров TensorFlow:

Типы парсеров TensorFlow Тензоры TensorFlow Объяснение
tf.FixedLenFeature([], tf.int32) плотный тензор Разобрать объект фиксированной длины; то есть все строки имеют одинаковое постоянное количество элементов, например, только один элемент или массив, который всегда имеет одинаковое количество элементов для каждой строки.
tf.SparseFeature(index_key=['key_1st_index', 'key_2nd_index'], value_key='key_value', dtype=tf.int64, size=[20, 50]) разреженный тензор Проанализируйте разреженный объект, где каждая строка имеет список индексов и значений переменной длины. 'index_key' идентифицирует индексы. 'value_key' идентифицирует значение. «dtype» — это тип данных. «Размер» — это ожидаемое максимальное значение индекса для каждой записи индекса.
tfio.experimental.columnar.VarLenFeatureWithRank([],tf.int64) разреженный тензор Разобрать объект переменной длины; это означает, что каждая строка данных может иметь переменное количество элементов, например, 1-я строка имеет 5 элементов, 2-я строка имеет 7 элементов.

Таблица 2 поддерживаемое преобразование из типов Avro в типы TensorFlow:

Авро примитивный тип Примитивный тип TensorFlow
логическое значение: двоичное значение tf.bool
байты: последовательность 8-битных байтов без знака tf.string
double: 64-битное число двойной точности IEEE с плавающей запятой tf.float64
enum: тип перечисления tf.string с использованием имени символа
float: 32-битное число с плавающей запятой одинарной точности IEEE tf.float32
int: 32-битное целое число со знаком tf.int32
long: 64-битное целое число со знаком tf.int64
ноль: нет значения использует значение по умолчанию
строка: последовательность символов Юникода tf.string

Полный набор примеров Avro набора данных API предоставляется в рамках испытаний .