API Avro Dataset

Visualizza su TensorFlow.org Esegui in Google Colab Visualizza l'origine su GitHub Scarica quaderno

Panoramica

L'obiettivo di Avro Dataset API è caricare Avro dati formattati in modo nativo tensorflow come tensorflow set di dati . Avro è un sistema di serializzazione dei dati simile a Protocol Buffers. È ampiamente utilizzato in Apache Hadoop dove può fornire sia un formato di serializzazione per dati persistenti, sia un formato wire per la comunicazione tra i nodi Hadoop. I dati Avro sono un formato di dati binari compatti e orientato alla riga. Si basa su uno schema che viene archiviato come file JSON separato. Per le specifiche di formato e lo schema Avro dichiarazione, si prega di fare riferimento al manuale ufficiale .

Pacchetto di installazione

Installa il pacchetto tensorflow-io richiesto

pip install tensorflow-io

Importa pacchetti

import tensorflow as tf
import tensorflow_io as tfio

Convalida le importazioni tf e tfio

print("tensorflow-io version: {}".format(tfio.__version__))
print("tensorflow version: {}".format(tf.__version__))
tensorflow-io version: 0.18.0
tensorflow version: 2.5.0

Utilizzo

Esplora il set di dati

Ai fini di questo tutorial, scarichiamo il set di dati Avro di esempio.

Scarica un file Avro di esempio:

curl -OL https://github.com/tensorflow/io/raw/master/docs/tutorials/avro/train.avro
ls -l train.avro
% Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   151  100   151    0     0   1268      0 --:--:-- --:--:-- --:--:--  1268
100   369  100   369    0     0   1255      0 --:--:-- --:--:-- --:--:--  1255
-rw-rw-r-- 1 kbuilder kokoro 369 May 25 22:23 train.avro

Scarica il file di schema corrispondente del file Avro di esempio:

curl -OL https://github.com/tensorflow/io/raw/master/docs/tutorials/avro/train.avsc
ls -l train.avsc
% Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   151  100   151    0     0   1247      0 --:--:-- --:--:-- --:--:--  1247
100   271  100   271    0     0    780      0 --:--:-- --:--:-- --:--:--   780
-rw-rw-r-- 1 kbuilder kokoro 271 May 25 22:23 train.avsc

Nell'esempio precedente, è stato creato un set di dati Avro di test basato sul set di dati mnist. Il set di dati mnist originale in formato TFRecord è generato dal TF di nome set di dati . Tuttavia, il set di dati mnist è troppo grande come set di dati demo. Per semplicità, la maggior parte è stata tagliata e solo i primi record sono stati conservati. Inoltre, ulteriore taglio è stato fatto per image campo nel set di dati mnist originale e mappato a features campo in Avro. Quindi il file avro train.avro ha 4 registri, ciascuno dei quali ha 3 campi: features , che è un array di int, label , un int o nullo, e dataType , un enum. Per visualizzare il decodificato train.avro (Nota il file di dati Avro originale non è leggibile come Avro è un formato compatto):

Installa il pacchetto richiesto per leggere il file Avro:

pip install avro

Per leggere e stampare un file Avro in un formato leggibile:

from avro.io import DatumReader
from avro.datafile import DataFileReader

import json

def print_avro(avro_file, max_record_num=None):
    if max_record_num is not None and max_record_num <= 0:
        return

    with open(avro_file, 'rb') as avro_handler:
        reader = DataFileReader(avro_handler, DatumReader())
        record_count = 0
        for record in reader:
            record_count = record_count+1
            print(record)
            if max_record_num is not None and record_count == max_record_num:
               break

print_avro(avro_file='train.avro')
{'features': [0, 0, 0, 1, 4], 'label': None, 'dataType': 'TRAINING'}
{'features': [0, 0], 'label': 2, 'dataType': 'TRAINING'}
{'features': [0], 'label': 3, 'dataType': 'VALIDATION'}
{'features': [1], 'label': 4, 'dataType': 'VALIDATION'}

E lo schema di train.avro che è rappresentata da train.avsc è un file in formato JSON. Per visualizzare il train.avsc :

def print_schema(avro_schema_file):
    with open(avro_schema_file, 'r') as handle:
        parsed = json.load(handle)
    print(json.dumps(parsed, indent=4, sort_keys=True))

print_schema('train.avsc')
{
    "fields": [
        {
            "name": "features",
            "type": {
                "items": "int",
                "type": "array"
            }
        },
        {
            "name": "label",
            "type": [
                "int",
                "null"
            ]
        },
        {
            "name": "dataType",
            "type": {
                "name": "dataTypes",
                "symbols": [
                    "TRAINING",
                    "VALIDATION"
                ],
                "type": "enum"
            }
        }
    ],
    "name": "ImageDataset",
    "type": "record"
}

Prepara il set di dati

Carico train.avro come tensorflow insieme di dati con Avro di dati API:

features = {
    'features[*]': tfio.experimental.columnar.VarLenFeatureWithRank(dtype=tf.int32),
    'label': tf.io.FixedLenFeature(shape=[], dtype=tf.int32, default_value=-100),
    'dataType': tf.io.FixedLenFeature(shape=[], dtype=tf.string)
}

schema = tf.io.gfile.GFile('train.avsc').read()

dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
                                                              reader_schema=schema,
                                                              features=features,
                                                              shuffle=False,
                                                              batch_size=3,
                                                              num_epochs=1)

for record in dataset:
    print(record['features[*]'])
    print(record['label'])
    print(record['dataType'])
    print("--------------------")
SparseTensor(indices=tf.Tensor(
[[0 0]
 [0 1]
 [0 2]
 [0 3]
 [0 4]
 [1 0]
 [1 1]
 [2 0]], shape=(8, 2), dtype=int64), values=tf.Tensor([0 0 0 1 4 0 0 0], shape=(8,), dtype=int32), dense_shape=tf.Tensor([3 5], shape=(2,), dtype=int64))
tf.Tensor([-100    2    3], shape=(3,), dtype=int32)
tf.Tensor([b'TRAINING' b'TRAINING' b'VALIDATION'], shape=(3,), dtype=string)
--------------------
SparseTensor(indices=tf.Tensor([[0 0]], shape=(1, 2), dtype=int64), values=tf.Tensor([1], shape=(1,), dtype=int32), dense_shape=tf.Tensor([1 1], shape=(2,), dtype=int64))
tf.Tensor([4], shape=(1,), dtype=int32)
tf.Tensor([b'VALIDATION'], shape=(1,), dtype=string)
--------------------

L'esempio converte sopra train.avro nel set di dati tensorflow. Ogni elemento del set di dati è un dizionario la cui chiave è il nome della funzione, il valore è il tensore sparso o denso convertito. Ad esempio, converte features , label , dataType campo per un VarLenFeature (SparseTensor), FixedLenFeature (DenseTensor), e FixedLenFeature (DenseTensor) rispettivamente. Poiché batch_size è 3, è costringere 3 record da train.avro in un solo elemento nel set di dati risultato. Per il primo record train.avro cui etichetta è nullo, Avro lettore sostituisce con il valore predefinito specificato (-100). In questo esempio, ci sei 4 records in totale in train.avro . Poiché dimensione del lotto è 3, l'insieme di dati risultato contiene 3 elementi, di ultima delle quali dimensione del lotto è 1. Tuttavia utente è in grado di rilasciare l'ultimo lotto se la dimensione è più piccola dimensione del lotto abilitando drop_final_batch . Per esempio:

dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
                                                              reader_schema=schema,
                                                              features=features,
                                                              shuffle=False,
                                                              batch_size=3,
                                                              drop_final_batch=True,
                                                              num_epochs=1)

for record in dataset:
    print(record)
{'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f97656423d0>, 'dataType': <tf.Tensor: shape=(3,), dtype=string, numpy=array([b'TRAINING', b'TRAINING', b'VALIDATION'], dtype=object)>, 'label': <tf.Tensor: shape=(3,), dtype=int32, numpy=array([-100,    2,    3], dtype=int32)>}

Si può anche aumentare num_parallel_reads per velocizzare l'elaborazione dei dati Avro aumentando il parallelismo di avro parse/read.

dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
                                                              reader_schema=schema,
                                                              features=features,
                                                              shuffle=False,
                                                              num_parallel_reads=16,
                                                              batch_size=3,
                                                              drop_final_batch=True,
                                                              num_epochs=1)

for record in dataset:
    print(record)
{'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f9765693990>, 'dataType': <tf.Tensor: shape=(3,), dtype=string, numpy=array([b'TRAINING', b'TRAINING', b'VALIDATION'], dtype=object)>, 'label': <tf.Tensor: shape=(3,), dtype=int32, numpy=array([-100,    2,    3], dtype=int32)>}

Per l'utilizzo dettagliato di make_avro_record_dataset , si prega di fare riferimento a API doc .

Addestra i modelli tf.keras con il set di dati Avro

Ora esaminiamo un esempio end-to-end di training del modello tf.keras con il set di dati Avro basato sul set di dati mnist.

Carico train.avro come tensorflow insieme di dati con Avro di dati API:

features = {
    'features[*]': tfio.experimental.columnar.VarLenFeatureWithRank(dtype=tf.int32)
}

schema = tf.io.gfile.GFile('train.avsc').read()

dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
                                                              reader_schema=schema,
                                                              features=features,
                                                              shuffle=False,
                                                              batch_size=1,
                                                              num_epochs=1)

Definire un semplice modello keras:

def build_and_compile_cnn_model():
    model = tf.keras.Sequential()
    model.compile(optimizer='sgd', loss='mse')
    return model

model = build_and_compile_cnn_model()

Addestra il modello keras con il set di dati Avro:

model.fit(x=dataset, epochs=1, steps_per_epoch=1, verbose=1)
WARNING:tensorflow:Layers in a Sequential model should only have a single input tensor, but we receive a <class 'dict'> input: {'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f94b00645d0>}
Consider rewriting this model with the Functional API.
WARNING:tensorflow:Layers in a Sequential model should only have a single input tensor, but we receive a <class 'dict'> input: {'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f976476ca90>}
Consider rewriting this model with the Functional API.
1/1 [==============================] - 0s 60ms/step - loss: 0.0000e+00
<tensorflow.python.keras.callbacks.History at 0x7f94ec08c6d0>

Il set di dati avro può analizzare e forzare qualsiasi dato avro nei tensori TensorFlow, inclusi record in record, mappe, array, rami ed enumerazioni. Le informazioni sull'analisi vengono passate all'implementazione del set di dati avro come una mappa in cui le chiavi codificano come analizzare i valori dei dati codificano come forzare i dati nei tensori di TensorFlow, decidendo il tipo primitivo (ad es. bool, int, long, float, double, string ) così come il tipo di tensore (es. sparso o denso). Viene fornito un elenco dei tipi di parser di TensorFlow (vedere Tabella 1) e la coercizione dei tipi primitivi (Tabella 2).

Tabella 1 i tipi di parser TensorFlow supportati:

Tipi di parser TensorFlow TensorFlow Tensors Spiegazione
tf.FixedLenFeature([], tf.int32) tensore denso Analizza una funzione di lunghezza fissa; ovvero tutte le righe hanno lo stesso numero costante di elementi, ad esempio un solo elemento o un array che ha sempre lo stesso numero di elementi per ogni riga
tf.SparseFeature(index_key=['key_1st_index', 'key_2nd_index'], value_key='key_value', dtype=tf.int64, size=[20, 50]) tensore sparso Analizza una funzione sparsa in cui ogni riga ha un elenco di indici e valori di lunghezza variabile. La 'index_key' identifica gli indici. Il 'valore_chiave' identifica il valore. Il 'dtype' è il tipo di dati. La "dimensione" è il valore massimo di indice previsto per ciascuna voce di indice
tfio.experimental.columnar.VarLenFeatureWithRank([],tf.int64) tensore sparso Analizza una funzione di lunghezza variabile; ciò significa che ogni riga di dati può avere un numero variabile di elementi, ad esempio la 1a riga ha 5 elementi, la 2a riga ha 7 elementi

Tabella 2 la conversione supportata dai tipi Avro ai tipi di TensorFlow:

Tipo primitivo Avro Tipo primitivo TensorFlow
booleano: un valore binario tf.bool
byte: una sequenza di byte senza segno a 8 bit tf.string
double: numero in virgola mobile IEEE a 64 bit a doppia precisione tf.float64
enum: tipo di enumerazione tf.string utilizzando il nome del simbolo
float: numero in virgola mobile IEEE a 32 bit a precisione singola tf.float32
int: intero con segno a 32 bit tf.int32
long: intero con segno a 64 bit tf.int64
nullo: nessun valore utilizza il valore predefinito
stringa: sequenza di caratteri unicode tf.string

Un set completo di esempi di Avro set di dati API viene fornita entro le prove .