MLコミュニティデーは11月9日です! TensorFlow、JAXからの更新のために私たちに参加し、より多くの詳細をご覧ください

Tensorflowデータ検証を開始する

Tensorflow Data Validation(TFDV)は、トレーニングと提供データを分析して次のことを行うことができます。

コアAPIは、各機能をサポートし、その上に構築され、ノートブックのコンテキストで呼び出すことができる便利なメソッドを備えています。

記述データ統計の計算

TFDVは、存在する機能とその値の分布の形状に関するデータの概要を提供する記述統計を計算できます。ファセット概要などのツールを使用すると、これらの統計を簡潔に視覚化して簡単に閲覧できます。

たとえば、 pathTFRecord形式のファイル( tensorflow.Exampleタイプのレコードを保持している)を指しているとします。次のスニペットは、TFDVを使用した統計の計算を示しています。

    stats = tfdv.generate_statistics_from_tfrecord(data_location=path)

戻り値はDatasetFeatureStatisticsListプロトコルバッファです。 サンプルノートブックには、ファセットの概要を使用した統計の視覚化が含まれています。

    tfdv.visualize_statistics(stats)

統計の視覚化のスクリーンショット

前の例では、データがTFRecordファイルに保管されていることを前提としています。 TFDVはCSV入力形式もサポートしており、他の一般的な形式にも拡張できます。利用可能なデータデコーダーはここにあります。さらに、TFDVは、パンダDataFrameとして表されるメモリ内データを持つユーザーにtfdv.generate_statistics_from_dataframeユーティリティ関数を提供します。

TFDVは、デフォルトのデータ統計セットを計算するだけでなく、セマンティックドメイン(画像、テキストなど)の統計も計算できます。セマンティックドメイン統計の計算を有効にするには、 enable_semantic_domain_statsをTrueに設定したtfdv.StatsOptionsオブジェクトをtfdv.generate_statistics_from_tfrecordます。

GoogleCloudで実行

内部的には、TFDVはApache Beamのデータ並列処理フレームワークを使用して、大規模なデータセットの統計計算をスケーリングします。 TFDVとより深く統合したいアプリケーション(たとえば、データ生成パイプラインの最後に統計生成をアタッチする、カスタム形式でデータの統計を生成する)の場合、APIは統計生成用のBeamPTransformも公開します。

Google CloudでTFDVを実行するには、TFDVホイールファイルをダウンロードしてDataflowワーカーに提供する必要があります。次のように、wheelファイルを現在のディレクトリにダウンロードします。

pip download tensorflow_data_validation \
  --no-deps \
  --platform manylinux2010_x86_64 \
  --only-binary=:all:

次のスニペットは、GoogleCloudでのTFDVの使用例を示しています。


import tensorflow_data_validation as tfdv
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions, SetupOptions

PROJECT_ID = ''
JOB_NAME = ''
GCS_STAGING_LOCATION = ''
GCS_TMP_LOCATION = ''
GCS_DATA_LOCATION = ''
# GCS_STATS_OUTPUT_PATH is the file path to which to output the data statistics
# result.
GCS_STATS_OUTPUT_PATH = ''

PATH_TO_WHL_FILE = ''


# Create and set your PipelineOptions.
options = PipelineOptions()

# For Cloud execution, set the Cloud Platform project, job_name,
# staging location, temp_location and specify DataflowRunner.
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = PROJECT_ID
google_cloud_options.job_name = JOB_NAME
google_cloud_options.staging_location = GCS_STAGING_LOCATION
google_cloud_options.temp_location = GCS_TMP_LOCATION
options.view_as(StandardOptions).runner = 'DataflowRunner'

setup_options = options.view_as(SetupOptions)
# PATH_TO_WHL_FILE should point to the downloaded tfdv wheel file.
setup_options.extra_packages = [PATH_TO_WHL_FILE]

tfdv.generate_statistics_from_tfrecord(GCS_DATA_LOCATION,
                                       output_path=GCS_STATS_OUTPUT_PATH,
                                       pipeline_options=options)

この場合、生成された統計プロトが書き込まTFRecordファイルに格納されてGCS_STATS_OUTPUT_PATH

NOTEのいずれかを呼び出すときtfdv.generate_statistics_...機能(例えば、 tfdv.generate_statistics_from_tfrecord Googleクラウド上に)、あなたが提供しなければならないoutput_path 。 Noneを指定すると、エラーが発生する場合があります。

データからスキーマを推測する

スキーマは、データの予想されるプロパティを記述します。これらのプロパティの一部は次のとおりです。

  • どの機能が存在すると予想されるか
  • それらのタイプ
  • 各例の機能の値の数
  • すべての例にわたる各機能の存在
  • 機能の予想されるドメイン。

要するに、スキーマは「正しい」データへの期待を記述しているため、データのエラーを検出するために使用できます(以下で説明)。さらに、同じスキーマを使用して、データ変換用のTensorflowTransformを設定できます。スキーマはかなり静的であると予想されることに注意してください。たとえば、複数のデータセットが同じスキーマに準拠できるのに対し、統計(上記)はデータセットごとに異なる可能性があります。

スキーマの作成は、特に多くの機能を備えたデータセットの場合、面倒な作業になる可能性があるため、TFDVは、記述統計に基づいてスキーマの初期バージョンを生成する方法を提供します。

    schema = tfdv.infer_schema(stats)

一般に、TFDVは保守的なヒューリスティックを使用して、スキーマを特定のデータセットに過剰適合させないように、統計から安定したデータプロパティを推測します。 TFDVのヒューリスティックが見逃した可能性のあるデータに関するドメイン知識を取得するために推測されたスキーマ確認し、必要に応じてそれを改良することを強くお勧めします

デフォルトでは、 tfdv.infer_schemaあれば、必要な機能のそれぞれの形状を推測value_count.min等しいvalue_count.max機能するために。形状の推論を無効にするには、 infer_feature_shape引数をFalseに設定します。

スキーマ自体はスキーマプロトコルバッファとして保存されるため、標準のプロトコルバッファAPIを使用して更新/編集できます。 TFDVには、これらの更新を簡単にするためのユーティリティメソッドいくつか用意されています。たとえば、単一の値をとる必須の文字列機能payment_typeを記述するために、スキーマに次のスタンザが含まれているとします。

feature {
  name: "payment_type"
  value_count {
    min: 1
    max: 1
  }
  type: BYTES
  domain: "payment_type"
  presence {
    min_fraction: 1.0
    min_count: 1
  }
}

例の少なくとも50%で機能を設定する必要があることを示すには、次のようにします。

    tfdv.get_feature(schema, 'payment_type').presence.min_fraction = 0.5

サンプルノートブックには、スキーマをテーブルとして簡単に視覚化したものが含まれており、スキーマにエンコードされた各機能とその主な特性が一覧表示されています。

スキーマの視覚化のスクリーンショット

データにエラーがないか確認する

スキーマが与えられると、データセットがスキーマに設定された期待に準拠しているかどうか、またはデータの異常が存在するかどうかを確認できます。 (a)データセットの統計をスキーマと照合することにより、データセット全体の集計でエラーがないかデータをチェックできます。または、(b)例ごとにエラーをチェックできます。

データセットの統計をスキーマと照合する

集計のエラーをチェックするために、TFDVはデータセットの統計をスキーマと照合し、不一致をマークします。例えば:

    # Assume that other_path points to another TFRecord file
    other_stats = tfdv.generate_statistics_from_tfrecord(data_location=other_path)
    anomalies = tfdv.validate_statistics(statistics=other_stats, schema=schema)

結果は異常プロトコルバッファのインスタンスであり、統計がスキーマと一致しないエラーを記述します。たとえば、 other_pathのデータに、スキーマで指定されたドメイン外の機能payment_type値を持つ例が含まれているとします。

これにより異常が発生します

   payment_type  Unexpected string values  Examples contain values missing from the schema: Prcard (<1%).

ドメイン外の値が機能値の1%未満の統計で見つかったことを示します。

これが予期されていた場合、スキーマは次のように更新できます。

   tfdv.get_domain(schema, 'payment_type').value.append('Prcard')

異常が本当にデータエラーを示している場合は、トレーニングに使用する前に、基になるデータを修正する必要があります。

このモジュールで検出できるさまざまな異常タイプをここにリストします

サンプルノートブックには、異常がテーブルとして簡単に視覚化されており、エラーが検出された機能と各エラーの簡単な説明がリストされています。

異常のスクリーンショット

例ごとにエラーをチェックする

TFDVには、データセット全体の統計をスキーマと比較する代わりに、例ごとにデータを検証するオプションもあります。 TFDVは、例ごとにデータを検証し、見つかった異常な例の要約統計量を生成するための関数を提供します。例えば:

   options = tfdv.StatsOptions(schema=schema)
   anomalous_example_stats = tfdv.validate_examples_in_tfrecord(
       data_location=input, stats_options=options)

validate_examples_in_tfrecordが返すanomalous_example_statsは、 DatasetFeatureStatisticsListプロトコルバッファであり、各データセットは、特定の異常を示す一連の例で構成されています。これを使用して、特定の異常を示すデータセット内の例の数とそれらの例の特性を判別できます。

スキーマ環境

デフォルトでは、検証では、パイプライン内のすべてのデータセットが単一のスキーマに準拠していることを前提としています。場合によっては、わずかなスキーマのバリエーションを導入する必要があります。たとえば、ラベルとして使用される機能はトレーニング中に必要ですが(検証する必要があります)、提供中に欠落しています。

環境は、そのような要件を表現するために使用できます。特に、スキーマの機能は、default_environment、in_environment、およびnot_in_environmentを使用して一連の環境に関連付けることができます。

たとえば、ヒント機能がトレーニングのラベルとして使用されているが、提供データに欠落している場合です。環境を指定しないと、異常として表示されます。

    serving_stats = tfdv.generate_statistics_from_tfrecord(data_location=serving_data_path)
    serving_anomalies = tfdv.validate_statistics(serving_stats, schema)

サービング異常のスクリーンショット

これを修正するには、すべての機能のデフォルト環境を「トレーニング」と「サービス」の両方に設定し、「ヒント」機能をサービス環境から除外する必要があります。

    # All features are by default in both TRAINING and SERVING environments.
    schema.default_environment.append('TRAINING')
    schema.default_environment.append('SERVING')

    # Specify that 'tips' feature is not in SERVING environment.
    tfdv.get_feature(schema, 'tips').not_in_environment.append('SERVING')

    serving_anomalies_with_env = tfdv.validate_statistics(
        serving_stats, schema, environment='SERVING')

データのスキューとドリフトのチェック

データセットがスキーマで設定された期待値に準拠しているかどうかを確認することに加えて、TFDVは以下を検出する機能も提供します。

  • トレーニングとデータの提供の間の偏り
  • トレーニングデータの異なる日の間のドリフト

TFDVは、スキーマで指定されたドリフト/スキューコンパレータに基づいて異なるデータセットの統計を比較することにより、このチェックを実行します。たとえば、トレーニングデータセットとサービスデータセット内の「payment_type」機能の間にスキューがあるかどうかを確認するには、次のようにします。

    # Assume we have already generated the statistics of training dataset, and
    # inferred a schema from it.
    serving_stats = tfdv.generate_statistics_from_tfrecord(data_location=serving_data_path)
    # Add a skew comparator to schema for 'payment_type' and set the threshold
    # of L-infinity norm for triggering skew anomaly to be 0.01.
    tfdv.get_feature(schema, 'payment_type').skew_comparator.infinity_norm.threshold = 0.01
    skew_anomalies = tfdv.validate_statistics(
        statistics=train_stats, schema=schema, serving_statistics=serving_stats)

注数値特徴のスキューを検出するには、 skew_comparator infinity_normしきい値の代わりにjensen_shannon_divergenceしきい値を指定します。

データセットがスキーマで設定された期待値に準拠しているかどうかを確認するのと同じように、結果も異常プロトコルバッファーのインスタンスであり、トレーニングデータセットとサービスデータセットの間のスキューを記述します。たとえば、サービングデータに、値がCash機能payement_typeを持つ例が大幅に多いとすると、スキュー異常が発生します。

   payment_type  High L-infinity distance between serving and training  The L-infinity distance between serving and training is 0.0435984 (up to six significant digits), above the threshold 0.01. The feature value with maximum difference is: Cash

異常がトレーニングデータとサービスデータの間に偏りがあることを本当に示している場合、モデルのパフォーマンスに直接影響を与える可能性があるため、さらに調査する必要があります。

サンプルノートブックには、スキューベースの異常をチェックする簡単な例が含まれています。

トレーニングデータの異なる日の間のドリフトの検出も同様の方法で行うことができます

    # Assume we have already generated the statistics of training dataset for
    # day 2, and inferred a schema from it.
    train_day1_stats = tfdv.generate_statistics_from_tfrecord(data_location=train_day1_data_path)
    # Add a drift comparator to schema for 'payment_type' and set the threshold
    # of L-infinity norm for triggering drift anomaly to be 0.01.
    tfdv.get_feature(schema, 'payment_type').drift_comparator.infinity_norm.threshold = 0.01
    drift_anomalies = tfdv.validate_statistics(
        statistics=train_day2_stats, schema=schema, previous_statistics=train_day1_stats)

注数値特徴のスキューを検出するには、 drift_comparator infinity_normしきい値の代わりにjensen_shannon_divergenceしきい値を指定します。

カスタムデータコネクタの作成

データ統計を計算するために、TFDVは、さまざまな形式の入力データを処理するためのいくつかの便利な方法を提供します(たとえば、 TFRecordTFRecord 、CSVなど)。データ形式がこのリストにない場合は、入力データを読み取るためのカスタムデータコネクタを作成し、データ統計を計算するためにTFDVコアAPIに接続する必要があります。

TFDVのデータの統計を計算するためのコアAPIは、あるビームPTransform入力例(入力例のバッチは次のように表されるのバッチのPCollectionかかり矢印RecordBatch)、および単一含有PCollection出力DatasetFeatureStatisticsListプロトコルバッファ。

入力例をArrowRecordBatchにバッチ処理するカスタムデータコネクタを実装したら、データ統計を計算するためにtfdv.GenerateStatisticsに接続する必要があります。 TFRecordtf.train.ExampleTFRecordtf.train.Exampleます。 tfx_bslTFExampleRecordデータコネクタを提供します。以下は、それをtfdv.GenerateStatisticsに接続する方法の例です。

import tensorflow_data_validation as tfdv
from tfx_bsl.public import tfxio
import apache_beam as beam
from tensorflow_metadata.proto.v0 import statistics_pb2

DATA_LOCATION = ''
OUTPUT_LOCATION = ''

with beam.Pipeline() as p:
    _ = (
    p
    # 1. Read and decode the data with tfx_bsl.
    | 'TFXIORead' >> (
          tfxio.TFExampleRecord(
              file_pattern=[DATA_LOCATION],
              telemetry_descriptors=['my', 'tfdv']).BeamSource())
    # 2. Invoke TFDV `GenerateStatistics` API to compute the data statistics.
    | 'GenerateStatistics' >> tfdv.GenerateStatistics()
    # 3. Materialize the generated data statistics.
    | 'WriteStatsOutput' >> WriteStatisticsToTFRecord(OUTPUT_LOCATION))

データのスライスに関する統計の計算

TFDVは、データのスライス全体の統計を計算するように構成できます。スライスは、Arrow RecordBatch 、フォームのタプルのシーケンス(slice key, record batch)を出力するスライス関数を提供することで有効にできます。 TFDVは、統計を計算するときにtfdv.StatsOptions一部として提供できる機能値ベースのスライス関数生成する簡単な方法を提供します。

スライスが有効になっている場合、出力DatasetFeatureStatisticsListプロトには、スライスごとに1つずつ、複数のDatasetFeatureStatisticsプロトが含まれます。各スライスは、DatasetFeatureStatisticsプロトタイプでデータセット名として設定されている一意の名前で識別されます。デフォルトでは、TFDVは、構成されたスライスに加えて、データセット全体の統計を計算します。

import tensorflow_data_validation as tfdv
from tensorflow_data_validation.utils import slicing_util

# Slice on country feature (i.e., every unique value of the feature).
slice_fn1 = slicing_util.get_feature_value_slicer(features={'country': None})

# Slice on the cross of country and state feature (i.e., every unique pair of
# values of the cross).
slice_fn2 = slicing_util.get_feature_value_slicer(
    features={'country': None, 'state': None})

# Slice on specific values of a feature.
slice_fn3 = slicing_util.get_feature_value_slicer(
    features={'age': [10, 50, 70]})

stats_options = tfdv.StatsOptions(
    slice_functions=[slice_fn1, slice_fn2, slice_fn3])