API Avro Dataset

Veja no TensorFlow.org Executar no Google Colab Ver fonte no GitHub Baixar caderno

Visão geral

O objetivo do Avro Dataset API é carregar Avro formatado dados nativamente em TensorFlow como TensorFlow conjunto de dados . Avro é um sistema de serialização de dados semelhante ao Protocol Buffers. É amplamente usado no Apache Hadoop, onde pode fornecer um formato de serialização para dados persistentes e um formato de fio para comunicação entre nós do Hadoop. Os dados Avro são um formato de dados binários compactado orientado a linhas. Ele se baseia no esquema que é armazenado como um arquivo JSON separado. Para a especificação de Avro formato e esquema de declaração, consulte o manual oficial .

Pacote de configuração

Instale o pacote tensorflow-io necessário

pip install tensorflow-io

Importar pacotes

import tensorflow as tf
import tensorflow_io as tfio

Validar importações de tf e tfio

print("tensorflow-io version: {}".format(tfio.__version__))
print("tensorflow version: {}".format(tf.__version__))
tensorflow-io version: 0.18.0
tensorflow version: 2.5.0

Uso

Explorar o conjunto de dados

Para os fins deste tutorial, vamos baixar o conjunto de dados Avro de exemplo.

Baixe um arquivo Avro de amostra:

curl -OL https://github.com/tensorflow/io/raw/master/docs/tutorials/avro/train.avro
ls -l train.avro
% Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   151  100   151    0     0   1268      0 --:--:-- --:--:-- --:--:--  1268
100   369  100   369    0     0   1255      0 --:--:-- --:--:-- --:--:--  1255
-rw-rw-r-- 1 kbuilder kokoro 369 May 25 22:23 train.avro

Faça download do arquivo de esquema correspondente do arquivo Avro de amostra:

curl -OL https://github.com/tensorflow/io/raw/master/docs/tutorials/avro/train.avsc
ls -l train.avsc
% Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   151  100   151    0     0   1247      0 --:--:-- --:--:-- --:--:--  1247
100   271  100   271    0     0    780      0 --:--:-- --:--:-- --:--:--   780
-rw-rw-r-- 1 kbuilder kokoro 271 May 25 22:23 train.avsc

No exemplo acima, um conjunto de dados Avro de teste foi criado com base no conjunto de dados mnist. O conjunto de dados mnist original no formato TFRecord é gerada a partir TF chamado conjunto de dados . No entanto, o conjunto de dados mnist é muito grande como um conjunto de dados de demonstração. Para fins de simplicidade, a maior parte foi cortada e apenas os primeiros registros foram mantidos. Além disso, corte adicional foi feito por image campo no conjunto de dados mnist original e mapeados para features de campo em Avro. Portanto, o arquivo avro train.avro tem 4 registros, cada um dos quais tem 3 campos: features , que é uma matriz de int, label , um int ou nulo, e dataType , uma enumeração. Para visualizar a decodificado train.avro (Nota do arquivo de dados avro originais não é legível como Avro é um formato compactado):

Instale o pacote necessário para ler o arquivo Avro:

pip install avro

Para ler e imprimir um arquivo Avro em um formato legível:

from avro.io import DatumReader
from avro.datafile import DataFileReader

import json

def print_avro(avro_file, max_record_num=None):
    if max_record_num is not None and max_record_num <= 0:
        return

    with open(avro_file, 'rb') as avro_handler:
        reader = DataFileReader(avro_handler, DatumReader())
        record_count = 0
        for record in reader:
            record_count = record_count+1
            print(record)
            if max_record_num is not None and record_count == max_record_num:
               break

print_avro(avro_file='train.avro')
{'features': [0, 0, 0, 1, 4], 'label': None, 'dataType': 'TRAINING'}
{'features': [0, 0], 'label': 2, 'dataType': 'TRAINING'}
{'features': [0], 'label': 3, 'dataType': 'VALIDATION'}
{'features': [1], 'label': 4, 'dataType': 'VALIDATION'}

E o esquema de train.avro que é representado por train.avsc é um arquivo formatado em JSON. Para visualizar a train.avsc :

def print_schema(avro_schema_file):
    with open(avro_schema_file, 'r') as handle:
        parsed = json.load(handle)
    print(json.dumps(parsed, indent=4, sort_keys=True))

print_schema('train.avsc')
{
    "fields": [
        {
            "name": "features",
            "type": {
                "items": "int",
                "type": "array"
            }
        },
        {
            "name": "label",
            "type": [
                "int",
                "null"
            ]
        },
        {
            "name": "dataType",
            "type": {
                "name": "dataTypes",
                "symbols": [
                    "TRAINING",
                    "VALIDATION"
                ],
                "type": "enum"
            }
        }
    ],
    "name": "ImageDataset",
    "type": "record"
}

Preparar o conjunto de dados

Carga train.avro como TensorFlow conjunto de dados com Avro conjunto de dados API:

features = {
    'features[*]': tfio.experimental.columnar.VarLenFeatureWithRank(dtype=tf.int32),
    'label': tf.io.FixedLenFeature(shape=[], dtype=tf.int32, default_value=-100),
    'dataType': tf.io.FixedLenFeature(shape=[], dtype=tf.string)
}

schema = tf.io.gfile.GFile('train.avsc').read()

dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
                                                              reader_schema=schema,
                                                              features=features,
                                                              shuffle=False,
                                                              batch_size=3,
                                                              num_epochs=1)

for record in dataset:
    print(record['features[*]'])
    print(record['label'])
    print(record['dataType'])
    print("--------------------")
SparseTensor(indices=tf.Tensor(
[[0 0]
 [0 1]
 [0 2]
 [0 3]
 [0 4]
 [1 0]
 [1 1]
 [2 0]], shape=(8, 2), dtype=int64), values=tf.Tensor([0 0 0 1 4 0 0 0], shape=(8,), dtype=int32), dense_shape=tf.Tensor([3 5], shape=(2,), dtype=int64))
tf.Tensor([-100    2    3], shape=(3,), dtype=int32)
tf.Tensor([b'TRAINING' b'TRAINING' b'VALIDATION'], shape=(3,), dtype=string)
--------------------
SparseTensor(indices=tf.Tensor([[0 0]], shape=(1, 2), dtype=int64), values=tf.Tensor([1], shape=(1,), dtype=int32), dense_shape=tf.Tensor([1 1], shape=(2,), dtype=int64))
tf.Tensor([4], shape=(1,), dtype=int32)
tf.Tensor([b'VALIDATION'], shape=(1,), dtype=string)
--------------------

O exemplo acima convertidos train.avro em conjunto de dados tensorflow. Cada elemento do conjunto de dados é um dicionário cuja chave é o nome do recurso, valor é o tensor esparso ou denso convertido. Por exemplo, ele converte features , label , dataType campo para um VarLenFeature (SparseTensor), FixedLenFeature (DenseTensor) e FixedLenFeature (DenseTensor), respectivamente. Desde batch_size é 3, que coagir 3 registros de train.avro em um elemento no conjunto de dados resultado. Para o primeiro registro no train.avro cujo rótulo é nulo, leitor substitui avro com o valor padrão especificado (-100). Neste exemplo, não é 4 registros no total, em train.avro . Desde tamanho do lote é de 3, o conjunto de dados de resultados contém 3 elementos, última das quais do tamanho do lote é 1. No entanto usuário também é capaz de soltar o último lote se o tamanho é menor do que o tamanho do lote, permitindo drop_final_batch . Por exemplo:

dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
                                                              reader_schema=schema,
                                                              features=features,
                                                              shuffle=False,
                                                              batch_size=3,
                                                              drop_final_batch=True,
                                                              num_epochs=1)

for record in dataset:
    print(record)
{'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f97656423d0>, 'dataType': <tf.Tensor: shape=(3,), dtype=string, numpy=array([b'TRAINING', b'TRAINING', b'VALIDATION'], dtype=object)>, 'label': <tf.Tensor: shape=(3,), dtype=int32, numpy=array([-100,    2,    3], dtype=int32)>}

Pode-se também aumentar num_parallel_reads para agilizar o processamento de dados Avro aumentando o paralelismo avro parse/read.

dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
                                                              reader_schema=schema,
                                                              features=features,
                                                              shuffle=False,
                                                              num_parallel_reads=16,
                                                              batch_size=3,
                                                              drop_final_batch=True,
                                                              num_epochs=1)

for record in dataset:
    print(record)
{'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f9765693990>, 'dataType': <tf.Tensor: shape=(3,), dtype=string, numpy=array([b'TRAINING', b'TRAINING', b'VALIDATION'], dtype=object)>, 'label': <tf.Tensor: shape=(3,), dtype=int32, numpy=array([-100,    2,    3], dtype=int32)>}

Para uso detalhado de make_avro_record_dataset , consulte doc API .

Treinar modelos tf.keras com o conjunto de dados Avro

Agora vamos analisar um exemplo completo de treinamento do modelo tf.keras com o conjunto de dados Avro baseado no conjunto de dados mnist.

Carga train.avro como TensorFlow conjunto de dados com Avro conjunto de dados API:

features = {
    'features[*]': tfio.experimental.columnar.VarLenFeatureWithRank(dtype=tf.int32)
}

schema = tf.io.gfile.GFile('train.avsc').read()

dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
                                                              reader_schema=schema,
                                                              features=features,
                                                              shuffle=False,
                                                              batch_size=1,
                                                              num_epochs=1)

Defina um modelo keras simples:

def build_and_compile_cnn_model():
    model = tf.keras.Sequential()
    model.compile(optimizer='sgd', loss='mse')
    return model

model = build_and_compile_cnn_model()

Treine o modelo keras com o conjunto de dados Avro:

model.fit(x=dataset, epochs=1, steps_per_epoch=1, verbose=1)
WARNING:tensorflow:Layers in a Sequential model should only have a single input tensor, but we receive a <class 'dict'> input: {'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f94b00645d0>}
Consider rewriting this model with the Functional API.
WARNING:tensorflow:Layers in a Sequential model should only have a single input tensor, but we receive a <class 'dict'> input: {'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f976476ca90>}
Consider rewriting this model with the Functional API.
1/1 [==============================] - 0s 60ms/step - loss: 0.0000e+00
<tensorflow.python.keras.callbacks.History at 0x7f94ec08c6d0>

O conjunto de dados avro pode analisar e forçar qualquer dado avro em tensores do TensorFlow, incluindo registros em registros, mapas, matrizes, ramificações e enumerações. As informações de análise são passadas para a implementação do conjunto de dados avro como um mapa onde as chaves codificam como analisar os valores de dados codificados sobre como coagir os dados em tensores TensorFlow – decidindo o tipo primitivo (por exemplo, bool, int, long, float, double, string ), bem como o tipo de tensor (por exemplo, esparso ou denso). Uma lista dos tipos de analisador do TensorFlow (consulte a Tabela 1) e a coerção de tipos primitivos (Tabela 2) é fornecida.

Tabela 1 os tipos de analisador TensorFlow compatíveis:

Tipos de analisador do TensorFlow Tensores de fluxo Explicação
tf.FixedLenFeature([], tf.int32) tensor denso Analisar um recurso de comprimento fixo; isto é, todas as linhas têm o mesmo número constante de elementos, por exemplo, apenas um elemento ou uma matriz que tem sempre o mesmo número de elementos para cada linha
tf.SparseFeature(index_key=['key_1st_index', 'key_2nd_index'], value_key='key_value', dtype=tf.int64, size=[20, 50]) tensor esparso Analise um recurso esparso em que cada linha tenha uma lista de índices e valores de comprimento variável. O 'index_key' identifica os índices. O 'value_key' identifica o valor. O 'dtype' é o tipo de dados. O 'tamanho' é o valor de índice máximo esperado para cada entrada de índice
tfio.experimental.columnar.VarLenFeatureWithRank([],tf.int64) tensor esparso Analisar um recurso de comprimento variável; isso significa que cada linha de dados pode ter um número variável de elementos, por exemplo, a 1ª linha tem 5 elementos, a 2ª linha tem 7 elementos

Tabela 2 a conversão suportada de tipos Avro para tipos do TensorFlow:

Tipo primitivo Avro Tipo primitivo do TensorFlow
boolean: um valor binário tf.bool
bytes: uma sequência de bytes sem sinal de 8 bits tf.string
double: número de ponto flutuante IEEE de 64 bits de precisão dupla tf.float64
enum: tipo de enumeração tf.string usando o nome do símbolo
float: número de ponto flutuante IEEE de 32 bits de precisão simples tf.float32
int: inteiro com sinal de 32 bits tf.int32
long: inteiro com sinal de 64 bits tf.int64
null: nenhum valor usa o valor padrão
string: sequência de caracteres unicode tf.string

Um conjunto abrangente de exemplos de Avro conjunto de dados API é fornecido dentro dos testes .