Treten Sie der SIG TFX-Addons-Community bei und helfen Sie, TFX noch besser zu machen!

Beginnen Sie mit der Tensorflow-Datenvalidierung

Tensorflow Data Validation (TFDV) kann Trainings- und Serving-Daten analysieren, um:

Die Kern-API unterstützt jede Funktionalität mit praktischen Methoden, die darauf aufbauen und im Kontext von Notebooks aufgerufen werden können.

Berechnung deskriptiver Datenstatistiken

TFDV kann beschreibende Statistiken berechnen, die einen schnellen Überblick über die Daten hinsichtlich der vorhandenen Merkmale und der Formen ihrer Wertverteilungen bieten. Tools wie Facettenübersicht bieten eine prägnante Visualisierung dieser Statistiken für ein einfaches Durchsuchen.

Angenommen, der path verweist auf eine Datei im TFRecord Format (das Datensätze vom Typ tensorflow.Example . tensorflow.Example ). Das folgende Snippet veranschaulicht die Berechnung von Statistiken mit TFDV:

    stats = tfdv.generate_statistics_from_tfrecord(data_location=path)

Der zurückgegebene Wert ist ein DatasetFeatureStatisticsList- Protokollpuffer. Das Beispielnotizbuch enthält eine Visualisierung der Statistiken mithilfe der Facettenübersicht :

    tfdv.visualize_statistics(stats)

Screenshot der Statistikvisualisierung

Im vorherigen Beispiel wird davon TFRecord dass die Daten in einer TFRecord Datei gespeichert sind. TFDV unterstützt auch das CSV-Eingabeformat mit Erweiterbarkeit für andere gängige Formate. Die verfügbaren Datendecoder finden Sie hier . Darüber hinaus bietet tfdv.generate_statistics_from_dataframe Dienstprogrammfunktion tfdv.generate_statistics_from_dataframe für Benutzer mit speicherinternen Daten, die als Pandas-Datenrahmen dargestellt werden.

Neben der Berechnung eines Standardsatzes von Datenstatistiken kann TFDV auch Statistiken für semantische Domänen (z. B. Bilder, Text) berechnen. Übergeben Sie zum Aktivieren der Berechnung semantischer Domänenstatistiken ein tfdv.StatsOptions- Objekt, wobei enable_semantic_domain_stats auf True gesetzt ist, und tfdv.generate_statistics_from_tfrecord .

Wird in Google Cloud ausgeführt

Intern verwendet TFDV das datenparallele Verarbeitungsframework von Apache Beam , um die Berechnung von Statistiken über große Datenmengen zu skalieren. Für Anwendungen, die eine tiefere Integration in TFDV wünschen (z. B. Statistikgenerierung am Ende einer Datengenerierungspipeline anhängen, Statistiken für Daten in benutzerdefiniertem Format generieren ), stellt die API auch eine Beam-PTransform für die Statistikgenerierung bereit.

Um TFDV in Google Cloud auszuführen, muss die TFDV-Raddatei heruntergeladen und den Dataflow-Mitarbeitern zur Verfügung gestellt werden. Laden Sie die Raddatei wie folgt in das aktuelle Verzeichnis herunter:

pip download tensorflow_data_validation \
  --no-deps \
  --platform manylinux2010_x86_64 \
  --only-binary=:all:

Das folgende Snippet zeigt ein Beispiel für die Verwendung von TFDV in Google Cloud:


import tensorflow_data_validation as tfdv
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions, SetupOptions

PROJECT_ID = ''
JOB_NAME = ''
GCS_STAGING_LOCATION = ''
GCS_TMP_LOCATION = ''
GCS_DATA_LOCATION = ''
# GCS_STATS_OUTPUT_PATH is the file path to which to output the data statistics
# result.
GCS_STATS_OUTPUT_PATH = ''

PATH_TO_WHL_FILE = ''


# Create and set your PipelineOptions.
options = PipelineOptions()

# For Cloud execution, set the Cloud Platform project, job_name,
# staging location, temp_location and specify DataflowRunner.
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = PROJECT_ID
google_cloud_options.job_name = JOB_NAME
google_cloud_options.staging_location = GCS_STAGING_LOCATION
google_cloud_options.temp_location = GCS_TMP_LOCATION
options.view_as(StandardOptions).runner = 'DataflowRunner'

setup_options = options.view_as(SetupOptions)
# PATH_TO_WHL_FILE should point to the downloaded tfdv wheel file.
setup_options.extra_packages = [PATH_TO_WHL_FILE]

tfdv.generate_statistics_from_tfrecord(GCS_DATA_LOCATION,
                                       output_path=GCS_STATS_OUTPUT_PATH,
                                       pipeline_options=options)

In diesem Fall wird die erzeugte Statistiken Proto in einer TFRecord - Datei geschrieben gespeichert GCS_STATS_OUTPUT_PATH .

HINWEIS Wenn Sie eine der Funktionen tfdv.generate_statistics_... (z. B. tfdv.generate_statistics_from_tfrecord ) in Google Cloud tfdv.generate_statistics_from_tfrecord , müssen Sie einen output_path . Die Angabe von Keine kann einen Fehler verursachen.

Ableiten eines Schemas über die Daten

Das Schema beschreibt die erwarteten Eigenschaften der Daten. Einige dieser Eigenschaften sind:

  • Welche Funktionen werden voraussichtlich vorhanden sein?
  • ihre Art
  • Die Anzahl der Werte für ein Feature in jedem Beispiel
  • das Vorhandensein jedes Merkmals in allen Beispielen
  • die erwarteten Bereiche von Funktionen.

Kurz gesagt, das Schema beschreibt die Erwartungen für "korrekte" Daten und kann somit verwendet werden, um Fehler in den Daten zu erkennen (nachstehend beschrieben). Darüber hinaus kann dasselbe Schema verwendet werden, um die Tensorflow-Transformation für Datentransformationen einzurichten. Beachten Sie, dass das Schema voraussichtlich ziemlich statisch ist, z. B. können mehrere Datensätze demselben Schema entsprechen, während die oben beschriebenen Statistiken je nach Datensatz variieren können.

Da das Schreiben eines Schemas eine mühsame Aufgabe sein kann, insbesondere für Datensätze mit vielen Funktionen, bietet TFDV eine Methode zum Generieren einer ersten Version des Schemas basierend auf den beschreibenden Statistiken:

    schema = tfdv.infer_schema(stats)

Im Allgemeinen verwendet TFDV konservative Heuristiken, um stabile Dateneigenschaften aus den Statistiken abzuleiten, um eine Überanpassung des Schemas an den spezifischen Datensatz zu vermeiden. Es wird dringend empfohlen, das abgeleitete Schema zu überprüfen und nach Bedarf zu verfeinern , um Domänenwissen über die Daten zu erfassen, die die Heuristiken von TFDV möglicherweise übersehen haben.

Standardmäßig tfdv.infer_schema folgert die Form jedes erforderliches Merkmal, wenn value_count.min gleich value_count.max für die Funktion. Setzen Sie das Argument infer_feature_shape auf False, um die infer_feature_shape zu deaktivieren.

Das Schema selbst wird als Schema-Protokollpuffer gespeichert und kann daher mithilfe der Standard-Protokollpuffer-API aktualisiert / bearbeitet werden. TFDV bietet auch einige Dienstprogrammmethoden , um diese Updates zu vereinfachen. Angenommen, das Schema enthält die folgende payment_type , um ein erforderliches Zeichenfolgenfeature payment_type zu beschreiben, das einen einzelnen Wert annimmt:

feature {
  name: "payment_type"
  value_count {
    min: 1
    max: 1
  }
  type: BYTES
  domain: "payment_type"
  presence {
    min_fraction: 1.0
    min_count: 1
  }
}

Um zu markieren, dass das Feature in mindestens 50% der Beispiele ausgefüllt sein sollte:

    tfdv.get_feature(schema, 'payment_type').presence.min_fraction = 0.5

Das Beispielnotizbuch enthält eine einfache Visualisierung des Schemas als Tabelle, in der jedes Feature und seine Hauptmerkmale aufgelistet sind, wie sie im Schema codiert sind.

Screenshot der Schema-Visualisierung

Überprüfen der Daten auf Fehler

Bei einem gegebenen Schema kann überprüft werden, ob ein Datensatz den im Schema festgelegten Erwartungen entspricht oder ob Datenanomalien vorliegen. Sie können Ihre Daten auf Fehler überprüfen (a) im Aggregat über einen gesamten Datensatz hinweg, indem Sie die Statistiken des Datensatzes mit dem Schema abgleichen, oder (b) indem Sie anhand von Beispielen auf Fehler prüfen.

Abgleichen der Statistiken des Datasets mit einem Schema

Um das Aggregat auf Fehler zu überprüfen, vergleicht TFDV die Statistiken des Datasets mit dem Schema und markiert etwaige Unstimmigkeiten. Beispielsweise:

    # Assume that other_path points to another TFRecord file
    other_stats = tfdv.generate_statistics_from_tfrecord(data_location=other_path)
    anomalies = tfdv.validate_statistics(statistics=other_stats, schema=schema)

Das Ergebnis ist eine Instanz des Anomalies- Protokollpuffers und beschreibt alle Fehler, bei denen die Statistiken nicht mit dem Schema übereinstimmen. Angenommen, die Daten unter other_path enthalten Beispiele mit Werten für das Feature payment_type außerhalb der im Schema angegebenen Domäne.

Dies führt zu einer Anomalie

   payment_type  Unexpected string values  Examples contain values missing from the schema: Prcard (<1%).

Dies zeigt an, dass in den Statistiken in <1% der Feature-Werte ein Wert außerhalb der Domäne gefunden wurde.

Wenn dies erwartet wurde, kann das Schema wie folgt aktualisiert werden:

   tfdv.get_domain(schema, 'payment_type').value.append('Prcard')

Wenn die Anomalie tatsächlich auf einen Datenfehler hinweist, sollten die zugrunde liegenden Daten behoben werden, bevor sie für das Training verwendet werden.

Die verschiedenen Anomalietypen, die von diesem Modul erkannt werden können, sind hier aufgelistet.

Das Beispielnotizbuch enthält eine einfache Visualisierung der Anomalien als Tabelle, in der die Merkmale aufgeführt sind, in denen Fehler erkannt werden, sowie eine kurze Beschreibung jedes Fehlers.

Screenshot von Anomalien

Beispielweise auf Fehler prüfen

TFDV bietet auch die Möglichkeit, Daten anhand eines Beispiels zu validieren, anstatt datensatzweite Statistiken mit dem Schema zu vergleichen. TFDV bietet Funktionen zum Überprüfen von Daten auf Beispielbasis und zum Generieren von zusammenfassenden Statistiken für die gefundenen anomalen Beispiele. Beispielsweise:

   options = tfdv.StatsOptions(schema=schema)
   anomalous_example_stats = tfdv.validate_examples_in_tfrecord(
       data_location=input, stats_options=options)

Die anomalous_example_stats , die validate_examples_in_tfrecord zurückgeben, sind ein DatasetFeatureStatisticsList- Protokollpuffer, in dem jedes Dataset aus einer Reihe von Beispielen besteht, die eine bestimmte Anomalie aufweisen. Sie können dies verwenden, um die Anzahl der Beispiele in Ihrem Datensatz zu bestimmen, die eine bestimmte Anomalie aufweisen, sowie die Eigenschaften dieser Beispiele.

Schemaumgebungen

Standardmäßig wird bei Überprüfungen davon ausgegangen, dass alle Datasets in einer Pipeline einem einzelnen Schema entsprechen. In einigen Fällen ist die Einführung geringfügiger Schemaabweichungen erforderlich. Beispielsweise sind während des Trainings als Beschriftung verwendete Funktionen erforderlich (und sollten validiert werden), die jedoch während des Servierens fehlen.

Umgebungen können verwendet werden, um solche Anforderungen auszudrücken. Insbesondere können Features im Schema mit einer Reihe von Umgebungen verknüpft werden, indem default_environment, in_environment und not_in_environment verwendet werden.

Zum Beispiel, wenn die Tippfunktion im Training als Etikett verwendet wird, aber in den Serving-Daten fehlt. Ohne Angabe der Umgebung wird dies als Anomalie angezeigt.

    serving_stats = tfdv.generate_statistics_from_tfrecord(data_location=serving_data_path)
    serving_anomalies = tfdv.validate_statistics(serving_stats, schema)

Screenshot von Serving-Anomalien

Um dies zu beheben, müssen wir die Standardumgebung für alle Funktionen auf "TRAINING" und "SERVING" festlegen und die Funktion "Tipps" aus der SERVING-Umgebung ausschließen.

    # All features are by default in both TRAINING and SERVING environments.
    schema.default_environment.append('TRAINING')
    schema.default_environment.append('SERVING')

    # Specify that 'tips' feature is not in SERVING environment.
    tfdv.get_feature(schema, 'tips').not_in_environment.append('SERVING')

    serving_anomalies_with_env = tfdv.validate_statistics(
        serving_stats, schema, environment='SERVING')

Überprüfen von Datenversatz und -drift

TFDV prüft nicht nur, ob ein Datensatz den im Schema festgelegten Erwartungen entspricht, sondern bietet auch folgende Funktionen:

  • Versatz zwischen Trainings- und Serving-Daten
  • Drift zwischen verschiedenen Tagen der Trainingsdaten

TFDV führt diese Prüfung durch, indem es die Statistiken verschiedener Datensätze basierend auf den im Schema angegebenen Drift / Skew-Komparatoren vergleicht. So überprüfen Sie beispielsweise, ob zwischen der Funktion "Zahlungstyp" im Training und dem Serving-Datensatz ein Versatz besteht:

    # Assume we have already generated the statistics of training dataset, and
    # inferred a schema from it.
    serving_stats = tfdv.generate_statistics_from_tfrecord(data_location=serving_data_path)
    # Add a skew comparator to schema for 'payment_type' and set the threshold
    # of L-infinity norm for triggering skew anomaly to be 0.01.
    tfdv.get_feature(schema, 'payment_type').skew_comparator.infinity_norm.threshold = 0.01
    skew_anomalies = tfdv.validate_statistics(
        statistics=train_stats, schema=schema, serving_statistics=serving_stats)

HINWEIS Zur Erkennung von Skew für numerische Merkmale, eine angeben jensen_shannon_divergence Schwelle anstelle eine infinity_norm Schwelle im skew_comparator .

Wie bei der Überprüfung, ob ein Datensatz den im Schema festgelegten Erwartungen entspricht, ist das Ergebnis auch eine Instanz des Anomalies- Protokollpuffers und beschreibt etwaige Abweichungen zwischen dem Trainings- und dem Serving-Datensatz. Angenommen, die Serving-Daten enthalten wesentlich mehr Beispiele mit dem Merkmal payement_type mit dem Wert Cash . Dies führt zu einer Versatzanomalie

   payment_type  High L-infinity distance between serving and training  The L-infinity distance between serving and training is 0.0435984 (up to six significant digits), above the threshold 0.01. The feature value with maximum difference is: Cash

Wenn die Anomalie tatsächlich auf eine Abweichung zwischen Trainings- und Serving-Daten hinweist, sind weitere Untersuchungen erforderlich, da dies einen direkten Einfluss auf die Modellleistung haben könnte.

Das Beispielnotizbuch enthält ein einfaches Beispiel für die Überprüfung auf schiefbasierte Anomalien.

Das Erkennen von Abweichungen zwischen verschiedenen Tagen mit Trainingsdaten kann auf ähnliche Weise erfolgen

    # Assume we have already generated the statistics of training dataset for
    # day 2, and inferred a schema from it.
    train_day1_stats = tfdv.generate_statistics_from_tfrecord(data_location=train_day1_data_path)
    # Add a drift comparator to schema for 'payment_type' and set the threshold
    # of L-infinity norm for triggering drift anomaly to be 0.01.
    tfdv.get_feature(schema, 'payment_type').drift_comparator.infinity_norm.threshold = 0.01
    drift_anomalies = tfdv.validate_statistics(
        statistics=train_day2_stats, schema=schema, previous_statistics=train_day1_stats)

HINWEIS Zur Erkennung von Skew für numerische Merkmale, eine angeben jensen_shannon_divergence Schwelle anstelle eine infinity_norm Schwelle im drift_comparator .

Benutzerdefinierten Datenconnector schreiben

Um Datenstatistiken zu berechnen, bietet TFDV verschiedene praktische Methoden zum Verarbeiten von Eingabedaten in verschiedenen Formaten (z. B. TFRecord von tf.train.Example , CSV usw.). Wenn Ihr Datenformat nicht in dieser Liste enthalten ist, müssen Sie einen benutzerdefinierten Datenconnector zum Lesen von Eingabedaten schreiben und ihn mit der TFDV-Kern-API zur Berechnung von Datenstatistiken verbinden.

Die TFDV- Kern-API zum Berechnen von Datenstatistiken ist eine Beam-PTransform , die eine PC-Sammlung von Stapeln von Eingabebeispielen (ein Stapel von Eingabebeispielen wird als Arrow RecordBatch dargestellt) verwendet und eine PCollection ausgibt, die einen einzelnen DatasetFeatureStatisticsList Protokollpuffer enthält.

Nachdem Sie den benutzerdefinierten Datenconnector implementiert haben, der Ihre Eingabebeispiele in einem Arrow RecordBatch stapelt, müssen Sie ihn mit der tfdv.GenerateStatistics API verbinden, um die tfdv.GenerateStatistics zu berechnen. Nehmen TFRecord zum Beispiel TFRecord von tf.train.Example . tfx_bsl stellt den TFExampleRecord- tfx_bsl bereit. Im Folgenden finden Sie ein Beispiel für die Verbindung mit der tfdv.GenerateStatistics API.

import tensorflow_data_validation as tfdv
from tfx_bsl.public import tfxio
import apache_beam as beam
from tensorflow_metadata.proto.v0 import statistics_pb2

DATA_LOCATION = ''
OUTPUT_LOCATION = ''

with beam.Pipeline() as p:
    _ = (
    p
    # 1. Read and decode the data with tfx_bsl.
    | 'TFXIORead' >> (
          tfxio.TFExampleRecord(
              file_pattern=[DATA_LOCATION],
              telemetry_descriptors=['my', 'tfdv']).BeamSource())
    # 2. Invoke TFDV `GenerateStatistics` API to compute the data statistics.
    | 'GenerateStatistics' >> tfdv.GenerateStatistics()
    # 3. Materialize the generated data statistics.
    | 'WriteStatsOutput' >> WriteStatisticsToTFRecord(OUTPUT_LOCATION))

Berechnen von Statistiken über Datenscheiben

TFDV kann so konfiguriert werden, dass Statistiken über Datenscheiben berechnet werden. Das Slicing kann aktiviert werden, indem Slicing-Funktionen bereitgestellt werden, die einen Arrow RecordBatch und eine Folge von Formulartupeln (slice key, record batch) ausgeben. TFDV bietet eine einfache Möglichkeit, auf Feature-Werten basierende Slicing-Funktionen zu generieren , die als Teil von tfdv.StatsOptions bei der Berechnung von Statistiken bereitgestellt werden können.

Wenn das Slicing aktiviert ist, enthält das Ausgabeprotokoll DatasetFeatureStatisticsList mehrere Protos DatasetFeatureStatistics , eines für jedes Slice. Jedes Slice wird durch einen eindeutigen Namen identifiziert, der im DatasetFeatureStatistics-Proto als Dataset-Name festgelegt wird . Standardmäßig berechnet TFDV zusätzlich zu den konfigurierten Slices Statistiken für den gesamten Datensatz.

import tensorflow_data_validation as tfdv
from tensorflow_data_validation.utils import slicing_util

# Slice on country feature (i.e., every unique value of the feature).
slice_fn1 = slicing_util.get_feature_value_slicer(features={'country': None})

# Slice on the cross of country and state feature (i.e., every unique pair of
# values of the cross).
slice_fn2 = slicing_util.get_feature_value_slicer(
    features={'country': None, 'state': None})

# Slice on specific values of a feature.
slice_fn3 = slicing_util.get_feature_value_slicer(
    features={'age': [10, 50, 70]})

stats_options = tfdv.StatsOptions(
    slice_functions=[slice_fn1, slice_fn2, slice_fn3])