AvroデータセットAPI

TensorFlow.orgで表示GoogleColabで実行GitHubでソースを表示 ノートブックをダウンロード

概要

アブロデータセットAPIの目的は、アブロとしてTensorFlowにネイティブデータをフォーマットしロードすることですTensorFlowデータセット。 Avroは、ProtocolBuffersに類似したデータシリアル化システムです。 Apache Hadoopで広く使用されており、永続データのシリアル化形式と、Hadoopノード間の通信用のワイヤー形式の両方を提供できます。 Avroデータは、行指向の圧縮されたバイナリデータ形式です。これは、個別のJSONファイルとして保存されているスキーマに依存しています。アブロ・フォーマットとスキーマ宣言の仕様については、を参照してください公式マニュアル

セットアップパッケージ

必要なtensorflow-ioパッケージをインストールします

pip install tensorflow-io

パッケージをインポートする

import tensorflow as tf
import tensorflow_io as tfio

tfおよびtfioのインポートを検証します

print("tensorflow-io version: {}".format(tfio.__version__))
print("tensorflow version: {}".format(tf.__version__))
tensorflow-io version: 0.18.0
tensorflow version: 2.5.0

使用法

データセットを探索する

このチュートリアルの目的のために、サンプルのAvroデータセットをダウンロードしましょう。

サンプルのAvroファイルをダウンロードします。

curl -OL https://github.com/tensorflow/io/raw/master/docs/tutorials/avro/train.avro
ls -l train.avro
% Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   151  100   151    0     0   1268      0 --:--:-- --:--:-- --:--:--  1268
100   369  100   369    0     0   1255      0 --:--:-- --:--:-- --:--:--  1255
-rw-rw-r-- 1 kbuilder kokoro 369 May 25 22:23 train.avro

サンプルAvroファイルの対応するスキーマファイルをダウンロードします。

curl -OL https://github.com/tensorflow/io/raw/master/docs/tutorials/avro/train.avsc
ls -l train.avsc
% Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   151  100   151    0     0   1247      0 --:--:-- --:--:-- --:--:--  1247
100   271  100   271    0     0    780      0 --:--:-- --:--:-- --:--:--   780
-rw-rw-r-- 1 kbuilder kokoro 271 May 25 22:23 train.avsc

上記の例では、テスト用のAvroデータセットがmnistデータセットに基づいて作成されています。 TFRecord形式で元mnistデータセットは以下から生成されTFという名前のデータセット。ただし、mnistデータセットはデモデータセットとしては大きすぎます。簡単にするために、そのほとんどはトリミングされ、最初のいくつかのレコードのみが保持されました。また、追加のトリミングのために行われたimageオリジナルmnistデータセット内のフィールドとにマッピングされfeaturesアブロのフィールド。アブロファイルのでtrain.avro :3つの分野それぞれ有する4つのレコードを、持っているfeatures intの配列、である、 label 、int型はnull、およびdataType 、列挙型を。デコード表示するtrain.avro (注元アブロデータファイルはアブロが圧縮形式であるように人間が読める形式ではありません)。

Avroファイルを読み取るために必要なパッケージをインストールします。

pip install avro

人間が読める形式でAvroファイルを読み取って印刷するには:

from avro.io import DatumReader
from avro.datafile import DataFileReader

import json

def print_avro(avro_file, max_record_num=None):
    if max_record_num is not None and max_record_num <= 0:
        return

    with open(avro_file, 'rb') as avro_handler:
        reader = DataFileReader(avro_handler, DatumReader())
        record_count = 0
        for record in reader:
            record_count = record_count+1
            print(record)
            if max_record_num is not None and record_count == max_record_num:
               break

print_avro(avro_file='train.avro')
{'features': [0, 0, 0, 1, 4], 'label': None, 'dataType': 'TRAINING'}
{'features': [0, 0], 'label': 2, 'dataType': 'TRAINING'}
{'features': [0], 'label': 3, 'dataType': 'VALIDATION'}
{'features': [1], 'label': 4, 'dataType': 'VALIDATION'}

そして、のスキーマtrain.avroで表されるtrain.avsc JSON形式のファイルです。表示するにはtrain.avsc

def print_schema(avro_schema_file):
    with open(avro_schema_file, 'r') as handle:
        parsed = json.load(handle)
    print(json.dumps(parsed, indent=4, sort_keys=True))

print_schema('train.avsc')
{
    "fields": [
        {
            "name": "features",
            "type": {
                "items": "int",
                "type": "array"
            }
        },
        {
            "name": "label",
            "type": [
                "int",
                "null"
            ]
        },
        {
            "name": "dataType",
            "type": {
                "name": "dataTypes",
                "symbols": [
                    "TRAINING",
                    "VALIDATION"
                ],
                "type": "enum"
            }
        }
    ],
    "name": "ImageDataset",
    "type": "record"
}

データセットを準備する

ロードtrain.avroアブロデータセットのAPIとTensorFlowデータセットと:

features = {
    'features[*]': tfio.experimental.columnar.VarLenFeatureWithRank(dtype=tf.int32),
    'label': tf.io.FixedLenFeature(shape=[], dtype=tf.int32, default_value=-100),
    'dataType': tf.io.FixedLenFeature(shape=[], dtype=tf.string)
}

schema = tf.io.gfile.GFile('train.avsc').read()

dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
                                                              reader_schema=schema,
                                                              features=features,
                                                              shuffle=False,
                                                              batch_size=3,
                                                              num_epochs=1)

for record in dataset:
    print(record['features[*]'])
    print(record['label'])
    print(record['dataType'])
    print("--------------------")
SparseTensor(indices=tf.Tensor(
[[0 0]
 [0 1]
 [0 2]
 [0 3]
 [0 4]
 [1 0]
 [1 1]
 [2 0]], shape=(8, 2), dtype=int64), values=tf.Tensor([0 0 0 1 4 0 0 0], shape=(8,), dtype=int32), dense_shape=tf.Tensor([3 5], shape=(2,), dtype=int64))
tf.Tensor([-100    2    3], shape=(3,), dtype=int32)
tf.Tensor([b'TRAINING' b'TRAINING' b'VALIDATION'], shape=(3,), dtype=string)
--------------------
SparseTensor(indices=tf.Tensor([[0 0]], shape=(1, 2), dtype=int64), values=tf.Tensor([1], shape=(1,), dtype=int32), dense_shape=tf.Tensor([1 1], shape=(2,), dtype=int64))
tf.Tensor([4], shape=(1,), dtype=int32)
tf.Tensor([b'VALIDATION'], shape=(1,), dtype=string)
--------------------

上記の例の変換はtrain.avro tensorflowデータセットに。データセットの各要素は辞書であり、そのキーは機能名であり、値は変換されたスパースまたはデンステンソルです。例えば、それは変換featureslabeldataTypeそれぞれVarLenFeature(SparseTensor)、FixedLenFeature(DenseTensor)、およびFixedLenFeature(DenseTensor)にフィールドを。 BATCH_SIZEが3であるので、3からレコードを強制train.avro結果セット内の1つの要素に。最初のレコードのためにtrain.avroラベルがnullの場合、アブロ読者は(-100)デフォルト値を指定して、それを置き換えます。この例では、中に合計で4つのレコードがそこにいるtrain.avro 。バッチサイズは3であるため、結果のデータセットは3つの要素、ユーザは、サイズが可能にすることによってバッチサイズよりも小さい場合、最後のバッチを削除することが可能であるが、最後のバッチサイズは1である含有drop_final_batch 。例えば:

dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
                                                              reader_schema=schema,
                                                              features=features,
                                                              shuffle=False,
                                                              batch_size=3,
                                                              drop_final_batch=True,
                                                              num_epochs=1)

for record in dataset:
    print(record)
{'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f97656423d0>, 'dataType': <tf.Tensor: shape=(3,), dtype=string, numpy=array([b'TRAINING', b'TRAINING', b'VALIDATION'], dtype=object)>, 'label': <tf.Tensor: shape=(3,), dtype=int32, numpy=array([-100,    2,    3], dtype=int32)>}

また、avroの解析/読み取りの並列処理を増やすことで、num_parallel_readsを増やしてAvroデータ処理を拡張することもできます。

dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
                                                              reader_schema=schema,
                                                              features=features,
                                                              shuffle=False,
                                                              num_parallel_reads=16,
                                                              batch_size=3,
                                                              drop_final_batch=True,
                                                              num_epochs=1)

for record in dataset:
    print(record)
{'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f9765693990>, 'dataType': <tf.Tensor: shape=(3,), dtype=string, numpy=array([b'TRAINING', b'TRAINING', b'VALIDATION'], dtype=object)>, 'label': <tf.Tensor: shape=(3,), dtype=int32, numpy=array([-100,    2,    3], dtype=int32)>}

詳細な使用方法についてmake_avro_record_dataset 、を参照してくださいAPIドキュメント

Avroデータセットを使用してtf.kerasモデルをトレーニングする

次に、mnistデータセットに基づくAvroデータセットを使用したtf.kerasモデルトレーニングのエンドツーエンドの例を見ていきましょう。

ロードtrain.avroアブロデータセットのAPIとTensorFlowデータセットと:

features = {
    'features[*]': tfio.experimental.columnar.VarLenFeatureWithRank(dtype=tf.int32)
}

schema = tf.io.gfile.GFile('train.avsc').read()

dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
                                                              reader_schema=schema,
                                                              features=features,
                                                              shuffle=False,
                                                              batch_size=1,
                                                              num_epochs=1)

単純なkerasモデルを定義します。

def build_and_compile_cnn_model():
    model = tf.keras.Sequential()
    model.compile(optimizer='sgd', loss='mse')
    return model

model = build_and_compile_cnn_model()

Avroデータセットを使用してkerasモデルをトレーニングします。

model.fit(x=dataset, epochs=1, steps_per_epoch=1, verbose=1)
WARNING:tensorflow:Layers in a Sequential model should only have a single input tensor, but we receive a <class 'dict'> input: {'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f94b00645d0>}
Consider rewriting this model with the Functional API.
WARNING:tensorflow:Layers in a Sequential model should only have a single input tensor, but we receive a <class 'dict'> input: {'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f976476ca90>}
Consider rewriting this model with the Functional API.
1/1 [==============================] - 0s 60ms/step - loss: 0.0000e+00
<tensorflow.python.keras.callbacks.History at 0x7f94ec08c6d0>

avroデータセットは、レコード、マップ、配列、ブランチ、列挙型のレコードを含む、任意のavroデータを解析してTensorFlowテンソルに強制することができます。解析情報は、データを解析する方法をキーがエンコードするマップとしてavroデータセットの実装に渡され、データをTensorFlowテンソルに強制する方法をエンコードします-プリミティブ型(bool、int、long、float、double、stringなど)を決定します)およびテンソルタイプ(スパースまたはデンスなど)。 TensorFlowのパーサータイプ(表1を参照)とプリミティブ型の強制(表2)のリストが提供されています。

表1サポートされているTensorFlowパーサータイプ:

TensorFlowパーサータイプTensorFlowテンソル説明
tf.FixedLenFeature([]、tf.int32)密なテンソル固定長フィーチャーを解析します。つまり、すべての行に同じ一定数の要素があります。たとえば、1つの要素だけ、または各行に常に同じ数の要素がある配列などです。
tf.SparseFeature(index_key = ['key_1st_index'、 'key_2nd_index']、value_key = 'key_value'、dtype = tf.int64、size = [20、50])スパーステンソル各行にインデックスと値の可変長リストがあるスパース機能を解析します。 'index_key'はインデックスを識別します。 'value_key'は値を識別します。 'dtype'はデータ型です。 「サイズ」は、各インデックスエントリの予想される最大インデックス値です。
tfio.experimental.columnar.VarLenFeatureWithRank([]、tf.int64)スパーステンソル可変長フィーチャーを解析します。つまり、各データ行には可変数の要素を含めることができます。たとえば、最初の行には5つの要素があり、2番目の行には7つの要素があります。

表2AvroタイプからTensorFlowのタイプへのサポートされている変換:

Avroプリミティブ型TensorFlowプリミティブタイプ
ブール値:バイナリ値tf.bool
バイト:8ビットの符号なしバイトのシーケンスtf.string
double:倍精度64ビットIEEE浮動小数点数tf.float64
enum:列挙型シンボル名を使用したtf.string
float:単精度32ビットIEEE浮動小数点数tf.float32
int:32ビット符号付き整数tf.int32
long:64ビット符号付き整数tf.int64
null:値なしデフォルト値を使用
文字列:Unicode文字シーケンスtf.string

アブロデータセットのAPIの例の包括的なセットを内に設けられているテスト