API de conjunto de datos de Avro

Ver en TensorFlow.org Ejecutar en Google Colab Ver fuente en GitHub Descargar cuaderno

Descripción general

El objetivo de API Avro conjunto de datos es para cargar Avro datos formateados de forma nativa en TensorFlow como TensorFlow conjunto de datos . Avro es un sistema de serialización de datos similar a Protocol Buffers. Se usa ampliamente en Apache Hadoop, donde puede proporcionar un formato de serialización para datos persistentes y un formato de cable para la comunicación entre los nodos de Hadoop. Los datos de Avro son un formato de datos binarios compactado y orientado a filas. Se basa en un esquema que se almacena como un archivo JSON separado. Para la especificación de formato y esquema de Avro declaración, por favor consulte el manual oficial .

Paquete de instalación

Instale el paquete tensorflow-io requerido

pip install tensorflow-io

Importar paquetes

import tensorflow as tf
import tensorflow_io as tfio

Validar importaciones de tf y tfio

print("tensorflow-io version: {}".format(tfio.__version__))
print("tensorflow version: {}".format(tf.__version__))
tensorflow-io version: 0.18.0
tensorflow version: 2.5.0

Uso

Explore el conjunto de datos

Para el propósito de este tutorial, descarguemos el conjunto de datos Avro de muestra.

Descargue un archivo Avro de muestra:

curl -OL https://github.com/tensorflow/io/raw/master/docs/tutorials/avro/train.avro
ls -l train.avro
% Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   151  100   151    0     0   1268      0 --:--:-- --:--:-- --:--:--  1268
100   369  100   369    0     0   1255      0 --:--:-- --:--:-- --:--:--  1255
-rw-rw-r-- 1 kbuilder kokoro 369 May 25 22:23 train.avro

Descargue el archivo de esquema correspondiente del archivo Avro de muestra:

curl -OL https://github.com/tensorflow/io/raw/master/docs/tutorials/avro/train.avsc
ls -l train.avsc
% Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   151  100   151    0     0   1247      0 --:--:-- --:--:-- --:--:--  1247
100   271  100   271    0     0    780      0 --:--:-- --:--:-- --:--:--   780
-rw-rw-r-- 1 kbuilder kokoro 271 May 25 22:23 train.avsc

En el ejemplo anterior, se creó un conjunto de datos Avro de prueba basado en el conjunto de datos mnist. El conjunto de datos original en formato mnist TFRecord se genera a partir del TF llamado conjunto de datos . Sin embargo, el conjunto de datos mnist es demasiado grande como conjunto de datos de demostración. Por motivos de simplicidad, la mayor parte se recortó y solo se conservaron los primeros registros. Por otra parte, el recorte adicional se hizo para image de campo de conjunto de datos original y mnist asigna a features de campo en Avro. Así el archivo Avro train.avro tiene 4 registros, cada uno de los cuales tiene 3 campos: features , que es una matriz de int, label , un int o nula, y dataType , una enumeración. Para ver el decodificado train.avro (Nota el archivo de datos Avro original, no es legible como Avro es un formato compactado):

Instale el paquete requerido para leer el archivo Avro:

pip install avro

Para leer e imprimir un archivo Avro en un formato legible por humanos:

from avro.io import DatumReader
from avro.datafile import DataFileReader

import json

def print_avro(avro_file, max_record_num=None):
    if max_record_num is not None and max_record_num <= 0:
        return

    with open(avro_file, 'rb') as avro_handler:
        reader = DataFileReader(avro_handler, DatumReader())
        record_count = 0
        for record in reader:
            record_count = record_count+1
            print(record)
            if max_record_num is not None and record_count == max_record_num:
               break

print_avro(avro_file='train.avro')
{'features': [0, 0, 0, 1, 4], 'label': None, 'dataType': 'TRAINING'}
{'features': [0, 0], 'label': 2, 'dataType': 'TRAINING'}
{'features': [0], 'label': 3, 'dataType': 'VALIDATION'}
{'features': [1], 'label': 4, 'dataType': 'VALIDATION'}

Y el esquema de train.avro que está representado por train.avsc es un archivo con formato JSON. Para ver el train.avsc :

def print_schema(avro_schema_file):
    with open(avro_schema_file, 'r') as handle:
        parsed = json.load(handle)
    print(json.dumps(parsed, indent=4, sort_keys=True))

print_schema('train.avsc')
{
    "fields": [
        {
            "name": "features",
            "type": {
                "items": "int",
                "type": "array"
            }
        },
        {
            "name": "label",
            "type": [
                "int",
                "null"
            ]
        },
        {
            "name": "dataType",
            "type": {
                "name": "dataTypes",
                "symbols": [
                    "TRAINING",
                    "VALIDATION"
                ],
                "type": "enum"
            }
        }
    ],
    "name": "ImageDataset",
    "type": "record"
}

Prepare el conjunto de datos

Cargar train.avro como conjunto de datos TensorFlow con Avro API conjunto de datos:

features = {
    'features[*]': tfio.experimental.columnar.VarLenFeatureWithRank(dtype=tf.int32),
    'label': tf.io.FixedLenFeature(shape=[], dtype=tf.int32, default_value=-100),
    'dataType': tf.io.FixedLenFeature(shape=[], dtype=tf.string)
}

schema = tf.io.gfile.GFile('train.avsc').read()

dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
                                                              reader_schema=schema,
                                                              features=features,
                                                              shuffle=False,
                                                              batch_size=3,
                                                              num_epochs=1)

for record in dataset:
    print(record['features[*]'])
    print(record['label'])
    print(record['dataType'])
    print("--------------------")
SparseTensor(indices=tf.Tensor(
[[0 0]
 [0 1]
 [0 2]
 [0 3]
 [0 4]
 [1 0]
 [1 1]
 [2 0]], shape=(8, 2), dtype=int64), values=tf.Tensor([0 0 0 1 4 0 0 0], shape=(8,), dtype=int32), dense_shape=tf.Tensor([3 5], shape=(2,), dtype=int64))
tf.Tensor([-100    2    3], shape=(3,), dtype=int32)
tf.Tensor([b'TRAINING' b'TRAINING' b'VALIDATION'], shape=(3,), dtype=string)
--------------------
SparseTensor(indices=tf.Tensor([[0 0]], shape=(1, 2), dtype=int64), values=tf.Tensor([1], shape=(1,), dtype=int32), dense_shape=tf.Tensor([1 1], shape=(2,), dtype=int64))
tf.Tensor([4], shape=(1,), dtype=int32)
tf.Tensor([b'VALIDATION'], shape=(1,), dtype=string)
--------------------

Las ejemplo convierte anteriores train.avro en el conjunto de datos tensorflow. Cada elemento del conjunto de datos es un diccionario cuya clave es el nombre de la característica, el valor es el tensor denso o escaso convertido. Por ejemplo, convierte features , label , dataType de campo a un VarLenFeature (SparseTensor), FixedLenFeature (DenseTensor), y FixedLenFeature (DenseTensor) respectivamente. Desde batch_size es 3, se coaccionar 3 registros de train.avro en un solo elemento en el conjunto de datos resultado. Para el primer registro en train.avro cuya etiqueta es nulo, lector de Avro lo reemplaza con el valor predeterminado especificado (-100). En este ejemplo, hay 4 registros en total en train.avro . Dado que el tamaño de lote es 3, el conjunto de datos de resultados contiene 3 elementos, el último de los cuales de tamaño del lote es 1. Sin embargo usuario también es capaz de soltar el último lote si el tamaño es más pequeño que el tamaño del lote, permitiendo drop_final_batch . P.ej:

dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
                                                              reader_schema=schema,
                                                              features=features,
                                                              shuffle=False,
                                                              batch_size=3,
                                                              drop_final_batch=True,
                                                              num_epochs=1)

for record in dataset:
    print(record)
{'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f97656423d0>, 'dataType': <tf.Tensor: shape=(3,), dtype=string, numpy=array([b'TRAINING', b'TRAINING', b'VALIDATION'], dtype=object)>, 'label': <tf.Tensor: shape=(3,), dtype=int32, numpy=array([-100,    2,    3], dtype=int32)>}

También se puede aumentar num_parallel_reads para acelerar el procesamiento de datos de Avro aumentando el paralelismo de análisis / lectura de avro.

dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
                                                              reader_schema=schema,
                                                              features=features,
                                                              shuffle=False,
                                                              num_parallel_reads=16,
                                                              batch_size=3,
                                                              drop_final_batch=True,
                                                              num_epochs=1)

for record in dataset:
    print(record)
{'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f9765693990>, 'dataType': <tf.Tensor: shape=(3,), dtype=string, numpy=array([b'TRAINING', b'TRAINING', b'VALIDATION'], dtype=object)>, 'label': <tf.Tensor: shape=(3,), dtype=int32, numpy=array([-100,    2,    3], dtype=int32)>}

Para el uso detallado de make_avro_record_dataset , por favor refiérase a la documentación del API .

Entrene modelos de tf.keras con el conjunto de datos de Avro

Ahora veamos un ejemplo de un extremo a otro del entrenamiento del modelo tf.keras con un conjunto de datos Avro basado en un conjunto de datos mnist.

Cargar train.avro como conjunto de datos TensorFlow con Avro API conjunto de datos:

features = {
    'features[*]': tfio.experimental.columnar.VarLenFeatureWithRank(dtype=tf.int32)
}

schema = tf.io.gfile.GFile('train.avsc').read()

dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
                                                              reader_schema=schema,
                                                              features=features,
                                                              shuffle=False,
                                                              batch_size=1,
                                                              num_epochs=1)

Defina un modelo de keras simple:

def build_and_compile_cnn_model():
    model = tf.keras.Sequential()
    model.compile(optimizer='sgd', loss='mse')
    return model

model = build_and_compile_cnn_model()

Entrene el modelo de keras con el conjunto de datos de Avro:

model.fit(x=dataset, epochs=1, steps_per_epoch=1, verbose=1)
WARNING:tensorflow:Layers in a Sequential model should only have a single input tensor, but we receive a <class 'dict'> input: {'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f94b00645d0>}
Consider rewriting this model with the Functional API.
WARNING:tensorflow:Layers in a Sequential model should only have a single input tensor, but we receive a <class 'dict'> input: {'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f976476ca90>}
Consider rewriting this model with the Functional API.
1/1 [==============================] - 0s 60ms/step - loss: 0.0000e+00
<tensorflow.python.keras.callbacks.History at 0x7f94ec08c6d0>

El conjunto de datos avro puede analizar y convertir cualquier dato avro en tensores de TensorFlow, incluidos registros en registros, mapas, matrices, ramas y enumeraciones. La información de análisis se pasa a la implementación del conjunto de datos avro como un mapa donde las claves codifican cómo analizar los valores de los datos codifican sobre cómo convertir los datos en tensores de TensorFlow, decidiendo el tipo primitivo (p. Ej., Bool, int, long, float, double, string ) así como el tipo de tensor (por ejemplo, escaso o denso). Se proporciona una lista de los tipos de analizadores de TensorFlow (consulte la Tabla 1) y la coerción de los tipos primitivos (Tabla 2).

Tabla 1 los tipos de analizadores TensorFlow admitidos:

Tipos de analizadores de TensorFlow Tensores de TensorFlow Explicación
tf.FixedLenFeature ([], tf.int32) tensor denso Analizar una característica de longitud fija; es decir, todas las filas tienen el mismo número constante de elementos, por ejemplo, solo un elemento o una matriz que siempre tiene el mismo número de elementos para cada fila
tf.SparseFeature (index_key = ['key_1st_index', 'key_2nd_index'], value_key = 'key_value', dtype = tf.int64, size = [20, 50]) tensor escaso Analice una característica dispersa donde cada fila tiene una lista de índices y valores de longitud variable. La 'clave_índice' identifica los índices. La 'clave_valor' identifica el valor. El 'dtype' es el tipo de datos. El 'tamaño' es el valor de índice máximo esperado para cada entrada de índice
tfio.experimental.columnar.VarLenFeatureWithRank ([], tf.int64) tensor escaso Analizar una característica de longitud variable; eso significa que cada fila de datos puede tener un número variable de elementos, por ejemplo, la primera fila tiene 5 elementos, la segunda fila tiene 7 elementos

Tabla 2 la conversión admitida de tipos de Avro a tipos de TensorFlow:

Tipo primitivo avro Tipo primitivo de TensorFlow
booleano: un valor binario tf.bool
bytes: una secuencia de bytes sin firmar de 8 bits tf.string
doble: número de coma flotante IEEE de 64 bits de doble precisión tf.float64
enum: tipo de enumeración tf.string usando el nombre del símbolo
float: número de coma flotante IEEE de 32 bits de precisión simple tf.float32
int: entero de 32 bits con signo tf.int32
long: entero de 64 bits con signo tf.int64
nulo: sin valor usa el valor predeterminado
cadena: secuencia de caracteres unicode tf.string

Un conjunto completo de ejemplos de Avro API conjunto de datos se proporciona dentro de las pruebas .