Walidacja danych za pomocą TFX Pipeline i TensorFlow Data Validation

W tym samouczku opartym na notatniku utworzymy i uruchomimy potoki TFX w celu sprawdzenia poprawności danych wejściowych i utworzenia modelu ML. Notebook ten jest oparty na rurociągu TFX my zbudowany w prosty TFX Pipeline Tutorial . Jeśli jeszcze nie przeczytałeś tego samouczka, powinieneś go przeczytać przed kontynuowaniem tego notatnika.

Pierwszym zadaniem w każdym projekcie data science lub ML jest zrozumienie i wyczyszczenie danych, co obejmuje:

  • Zrozumienie typów danych, rozkładów i innych informacji (np. wartości średniej lub liczby unikalnych) na temat każdej cechy
  • Generowanie wstępnego schematu opisującego dane
  • Identyfikacja anomalii i braków danych w danych w odniesieniu do danego schematu

W tym samouczku stworzymy dwa potoki TFX.

Najpierw utworzymy potok do analizy zestawu danych i wygenerowania wstępnego schematu danego zestawu danych. Rurociąg ten obejmie dwa nowe komponenty, StatisticsGen i SchemaGen .

Po uzyskaniu odpowiedniego schematu danych utworzymy potok do trenowania modelu klasyfikacji ML na podstawie potoku z poprzedniego samouczka. W tym rurociągiem, będziemy używać schematu z pierwszego rurociągu oraz nowy składnik, ExampleValidator , do sprawdzania poprawności danych wejściowych.

Trzy nowe komponenty, StatisticsGen, SchemaGen i ExampleValidator, są składnikami TFX do analizy danych i walidacji, i są one realizowane za pomocą TensorFlow Data Validation bibliotekę.

Proszę zobaczyć Zrozumienie TFX Rurociągi , aby dowiedzieć się więcej na temat różnych koncepcji w TFX.


Najpierw musimy zainstalować pakiet TFX Python i pobrać zestaw danych, którego użyjemy dla naszego modelu.

Ulepsz Pip

Aby uniknąć aktualizacji Pip w systemie, gdy działa on lokalnie, upewnij się, że działamy w Colab. Systemy lokalne można oczywiście aktualizować oddzielnie.

  import colab
  !pip install --upgrade pip

Zainstaluj TFX

pip install -U tfx

Czy uruchomiłeś ponownie środowisko wykonawcze?

Jeśli korzystasz z Google Colab, przy pierwszym uruchomieniu powyższej komórki musisz ponownie uruchomić środowisko wykonawcze, klikając powyżej przycisk „RESTART RUNTIME” lub korzystając z menu „Runtime > Restart runtime...”. Wynika to ze sposobu, w jaki Colab ładuje paczki.

Sprawdź wersje TensorFlow i TFX.

import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
TensorFlow version: 2.6.2
TFX version: 1.4.0

Ustaw zmienne

Istnieje kilka zmiennych używanych do zdefiniowania potoku. Możesz dostosować te zmienne, jak chcesz. Domyślnie wszystkie dane wyjściowe z potoku będą generowane w bieżącym katalogu.

import os

# We will create two pipelines. One for schema generation and one for training.
SCHEMA_PIPELINE_NAME = "penguin-tfdv-schema"
PIPELINE_NAME = "penguin-tfdv"

# Output directory to store artifacts generated from the pipeline.
PIPELINE_ROOT = os.path.join('pipelines', PIPELINE_NAME)
# Path to a SQLite DB file to use as an MLMD storage.
METADATA_PATH = os.path.join('metadata', PIPELINE_NAME, 'metadata.db')

# Output directory where created models from the pipeline will be exported.
SERVING_MODEL_DIR = os.path.join('serving_model', PIPELINE_NAME)

from absl import logging
logging.set_verbosity(logging.INFO)  # Set default logging level.

Przygotuj przykładowe dane

Pobierzemy przykładowy zestaw danych do użycia w naszym potoku TFX. Zbiór danych używamy jest Palmer Penguins zestaw danych , który jest również stosowany w innych przykładach TFX .

Ten zbiór danych zawiera cztery funkcje liczbowe:

  • culmen_length_mm
  • culmen_depth_mm
  • flipper_length_mm
  • body_mass_g

Wszystkie cechy zostały już znormalizowane do zakresu [0,1]. Będziemy budować model klasyfikacji który przewiduje species pingwinów.

Ponieważ komponent TFX ExampleGen odczytuje dane wejściowe z katalogu, musimy utworzyć katalog i skopiować do niego zbiór danych.

import urllib.request
import tempfile

DATA_ROOT = tempfile.mkdtemp(prefix='tfx-data')  # Create a temporary directory.
_data_url = 'https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/penguin/data/labelled/penguins_processed.csv'
_data_filepath = os.path.join(DATA_ROOT, "data.csv")
urllib.request.urlretrieve(_data_url, _data_filepath)
('/tmp/tfx-datan3p7t1d2/data.csv', <http.client.HTTPMessage at 0x7f8d2f9f9110>)

Rzuć okiem na plik CSV.

head {_data_filepath}

Powinieneś zobaczyć pięć kolumn funkcji. species jest jednym z 0, 1 lub 2, a wszystkie inne cechy powinny mieć wartości pomiędzy 0 a 1. Stworzymy rurociągu TFX analizy zbioru danych.

Wygeneruj wstępny schemat

Potoki TFX są definiowane za pomocą interfejsów API Pythona. Stworzymy potok, aby automatycznie wygenerować schemat z przykładów wejściowych. Ten schemat może być przeglądany przez człowieka i dostosowywany w razie potrzeby. Po sfinalizowaniu schematu można go wykorzystać do szkolenia i przykładowej walidacji w późniejszych zadaniach.

Oprócz CsvExampleGen który jest stosowany w prosty TFX Pipeline tutoriala użyjemy StatisticsGen i SchemaGen :

  • StatisticsGen oblicza statystyki dla zbioru danych.
  • SchemaGen analizuje statystyki i tworzy początkowy schemat danych.

Zobacz prowadnice dla każdego komponentu lub komponentów TFX samouczka , aby dowiedzieć się więcej na temat tych składników.

Napisz definicję potoku

Definiujemy funkcję do tworzenia potoku TFX. Pipeline przedmiot stanowi rurociąg TFX, który może być uruchamiany za pomocą jednego z systemów instrumentacji gazociągu TFX podpór.

def _create_schema_pipeline(pipeline_name: str,
                            pipeline_root: str,
                            data_root: str,
                            metadata_path: str) -> tfx.dsl.Pipeline:
  """Creates a pipeline for schema generation."""
  # Brings data into the pipeline.
  example_gen = tfx.components.CsvExampleGen(input_base=data_root)

  # NEW: Computes statistics over data for visualization and schema generation.
  statistics_gen = tfx.components.StatisticsGen(

  # NEW: Generates schema based on the generated statistics.
  schema_gen = tfx.components.SchemaGen(
      statistics=statistics_gen.outputs['statistics'], infer_feature_shape=True)

  components = [

  return tfx.dsl.Pipeline(

Uruchom rurociąg

Użyjemy LocalDagRunner jak w poprzednim ćwiczeniu.

Powinien pojawić się komunikat „INFO:absl:Component SchemaGen został zakończony”. jeśli potok zakończył się pomyślnie.

Zbadamy dane wyjściowe potoku, aby zrozumieć nasz zestaw danych.

Przejrzyj wyniki rurociągu

Jak wyjaśniono w poprzednim ćwiczeniu rurociąg TFX wytwarza dwa rodzaje wyjściowych, artefakty i metadanych dB (MLMD) zawierającego metadane przedmiotów i wykonywanie rurowych. Zdefiniowaliśmy lokalizację tych wyjść w powyższych komórkach. Domyślnie artefakty są przechowywane pod pipelines katalogu i metadane są przechowywane w bazie danych sqlite pod metadata katalogu.

Za pomocą interfejsów API MLMD można programowo lokalizować te dane wyjściowe. Najpierw zdefiniujemy kilka funkcji narzędziowych do wyszukiwania artefaktów wyjściowych, które właśnie zostały utworzone.

from ml_metadata.proto import metadata_store_pb2
# Non-public APIs, just for showcase.
from tfx.orchestration.portable.mlmd import execution_lib

# TODO(b/171447278): Move these functions into the TFX library.

def get_latest_artifacts(metadata, pipeline_name, component_id):
  """Output artifacts of the latest run of the component."""
  context = metadata.store.get_context_by_type_and_name(
      'node', f'{pipeline_name}.{component_id}')
  executions = metadata.store.get_executions_by_context(context.id)
  latest_execution = max(executions,
                         key=lambda e:e.last_update_time_since_epoch)
  return execution_lib.get_artifacts_dict(metadata, latest_execution.id,

# Non-public APIs, just for showcase.
from tfx.orchestration.experimental.interactive import visualizations

def visualize_artifacts(artifacts):
  """Visualizes artifacts using standard visualization modules."""
  for artifact in artifacts:
    visualization = visualizations.get_registry().get_visualization(
    if visualization:

from tfx.orchestration.experimental.interactive import standard_visualizations

Teraz możemy zbadać dane wyjściowe z wykonania potoku.

# Non-public APIs, just for showcase.
from tfx.orchestration.metadata import Metadata
from tfx.types import standard_component_specs

metadata_connection_config = tfx.orchestration.metadata.sqlite_metadata_connection_config(

with Metadata(metadata_connection_config) as metadata_handler:
  # Find output artifacts from MLMD.
  stat_gen_output = get_latest_artifacts(metadata_handler, SCHEMA_PIPELINE_NAME,
  stats_artifacts = stat_gen_output[standard_component_specs.STATISTICS_KEY]

  schema_gen_output = get_latest_artifacts(metadata_handler,
                                           SCHEMA_PIPELINE_NAME, 'SchemaGen')
  schema_artifacts = schema_gen_output[standard_component_specs.SCHEMA_KEY]
INFO:absl:MetadataStore with DB connection initialized

Nadszedł czas na zbadanie wyników z każdego komponentu. Jak opisano powyżej, Tensorflow poprawności danych (TFDV) stosuje się StatisticsGen i SchemaGen i TFDV również wizualizację wyjść z tych składników.

W tym samouczku użyjemy metod pomocniczych wizualizacji w TFX, które wewnętrznie używają TFDV do wyświetlania wizualizacji.

Sprawdź dane wyjściowe z StatisticsGen

# docs-infra: no-execute

Możesz zobaczyć różne statystyki dla danych wejściowych. Statystyki te są dostarczane do SchemaGen skonstruować wstępny schemat danych automatycznie.

Sprawdź dane wyjściowe ze SchemaGen


Ten schemat jest automatycznie wywnioskowany z danych wyjściowych StatisticsGen. Powinieneś być w stanie zobaczyć 4 funkcje FLOAT i 1 funkcję INT.

Wyeksportuj schemat do wykorzystania w przyszłości

Musimy przejrzeć i udoskonalić wygenerowany schemat. Przejrzany schemat musi być utrwalony, aby mógł być używany w kolejnych potokach na potrzeby uczenia modelu ML. Innymi słowy, możesz chcieć dodać plik schematu do systemu kontroli wersji dla rzeczywistych przypadków użycia. W tym samouczku dla uproszczenia po prostu skopiujemy schemat do predefiniowanej ścieżki systemu plików.

import shutil

_schema_filename = 'schema.pbtxt'
SCHEMA_PATH = 'schema'

os.makedirs(SCHEMA_PATH, exist_ok=True)
_generated_path = os.path.join(schema_artifacts[0].uri, _schema_filename)

# Copy the 'schema.pbtxt' file from the artifact uri to a predefined path.
shutil.copy(_generated_path, SCHEMA_PATH)

Plik schematu wykorzystuje format tekstowy protokół Buffer oraz instancję TensorFlow Metadane Schema proto .

print(f'Schema at {SCHEMA_PATH}-----')
!cat {SCHEMA_PATH}/*
Schema at schema-----
feature {
  name: "body_mass_g"
  type: FLOAT
  presence {
    min_fraction: 1.0
    min_count: 1
  shape {
    dim {
      size: 1
feature {
  name: "culmen_depth_mm"
  type: FLOAT
  presence {
    min_fraction: 1.0
    min_count: 1
  shape {
    dim {
      size: 1
feature {
  name: "culmen_length_mm"
  type: FLOAT
  presence {
    min_fraction: 1.0
    min_count: 1
  shape {
    dim {
      size: 1
feature {
  name: "flipper_length_mm"
  type: FLOAT
  presence {
    min_fraction: 1.0
    min_count: 1
  shape {
    dim {
      size: 1
feature {
  name: "species"
  type: INT
  presence {
    min_fraction: 1.0
    min_count: 1
  shape {
    dim {
      size: 1

W razie potrzeby należy przejrzeć i ewentualnie edytować definicję schematu. W tym samouczku użyjemy wygenerowanego schematu w niezmienionej postaci.

Weryfikuj przykłady danych wejściowych i trenuj model ML

Będziemy wracać do rurociągu, który stworzyliśmy w prosty TFX Pipeline Tutorial , trenować model ML i użyć wygenerowanego schematu do pisania kodu modelu szkolenia.

Będziemy również dodać ExampleValidator komponent, który będzie szukał anomalii i brakujących wartości w zbiorze danych przychodzących w odniesieniu do schematu.

Napisz kod szkolenia modelu

Musimy napisać kod modelu jak my w prosty TFX Pipeline Tutorial .

Sam model jest taki sam jak w poprzednim samouczku, ale tym razem użyjemy schematu wygenerowanego z poprzedniego potoku zamiast ręcznego określania funkcji. Większość kodu nie została zmieniona. Jedyną różnicą jest to, że nie musimy określać nazw i typów funkcji w tym pliku. Zamiast tego, czytamy je z pliku schematu.

_trainer_module_file = 'penguin_trainer.py'
%%writefile {_trainer_module_file}

from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils

from tfx import v1 as tfx
from tfx_bsl.public import tfxio
from tensorflow_metadata.proto.v0 import schema_pb2

# We don't need to specify _FEATURE_KEYS and _FEATURE_SPEC any more.
# Those information can be read from the given schema file.

_LABEL_KEY = 'species'


def _input_fn(file_pattern: List[str],
              data_accessor: tfx.components.DataAccessor,
              schema: schema_pb2.Schema,
              batch_size: int = 200) -> tf.data.Dataset:
  """Generates features and label for training.

    file_pattern: List of paths or patterns of input tfrecord files.
    data_accessor: DataAccessor for converting input to RecordBatch.
    schema: schema of the input data.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  return data_accessor.tf_dataset_factory(
          batch_size=batch_size, label_key=_LABEL_KEY),

def _build_keras_model(schema: schema_pb2.Schema) -> tf.keras.Model:
  """Creates a DNN Keras model for classifying penguin data.

    A Keras Model.
  # The model below is built with Functional API, please refer to
  # https://www.tensorflow.org/guide/keras/overview for all API options.

  # ++ Changed code: Uses all features in the schema except the label.
  feature_keys = [f.name for f in schema.feature if f.name != _LABEL_KEY]
  inputs = [keras.layers.Input(shape=(1,), name=f) for f in feature_keys]
  # ++ End of the changed code.

  d = keras.layers.concatenate(inputs)
  for _ in range(2):
    d = keras.layers.Dense(8, activation='relu')(d)
  outputs = keras.layers.Dense(3)(d)

  model = keras.Model(inputs=inputs, outputs=outputs)

  return model

# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
  """Train the model based on given args.

    fn_args: Holds args used to train the model as name/value pairs.

  # ++ Changed code: Reads in schema file passed to the Trainer component.
  schema = tfx.utils.parse_pbtxt_file(fn_args.schema_path, schema_pb2.Schema())
  # ++ End of the changed code.

  train_dataset = _input_fn(
  eval_dataset = _input_fn(

  model = _build_keras_model(schema)

  # The result of the training should be saved in `fn_args.serving_model_dir`
  # directory.
  model.save(fn_args.serving_model_dir, save_format='tf')
Writing penguin_trainer.py

Teraz wykonałeś wszystkie kroki przygotowawcze, aby zbudować potok TFX na potrzeby uczenia modelu.

Napisz definicję potoku

Dodamy dwa nowe komponenty, Importer i ExampleValidator . Importer wprowadza zewnętrzny plik do potoku TFX. W tym przypadku jest to plik zawierający definicję schematu. ExampleValidator zbada dane wejściowe i sprawdzi, czy wszystkie dane wejściowe są zgodne z podanym przez nas schematem danych.

def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
                     schema_path: str, module_file: str, serving_model_dir: str,
                     metadata_path: str) -> tfx.dsl.Pipeline:
  """Creates a pipeline using predefined schema with TFX."""
  # Brings data into the pipeline.
  example_gen = tfx.components.CsvExampleGen(input_base=data_root)

  # Computes statistics over data for visualization and example validation.
  statistics_gen = tfx.components.StatisticsGen(

  # NEW: Import the schema.
  schema_importer = tfx.dsl.Importer(

  # NEW: Performs anomaly detection based on statistics and data schema.
  example_validator = tfx.components.ExampleValidator(

  # Uses user-provided Python function that trains a model.
  trainer = tfx.components.Trainer(
      schema=schema_importer.outputs['result'],  # Pass the imported schema.

  # Pushes the model to a filesystem destination.
  pusher = tfx.components.Pusher(

  components = [

      # NEW: Following three components were added to the pipeline.


  return tfx.dsl.Pipeline(

Uruchom rurociąg

Powinieneś zobaczyć "INFO:absl:Component Pusher jest gotowy." jeśli potok zakończył się pomyślnie.

Zbadaj wyjścia rurociągu

Przeszkoliliśmy model klasyfikacji dla pingwinów, a także zweryfikowaliśmy przykłady danych wejściowych w komponencie ExampleValidator. Możemy przeanalizować dane wyjściowe z ExampleValidator, tak jak zrobiliśmy to w poprzednim potoku.

metadata_connection_config = tfx.orchestration.metadata.sqlite_metadata_connection_config(

with Metadata(metadata_connection_config) as metadata_handler:
  ev_output = get_latest_artifacts(metadata_handler, PIPELINE_NAME,
  anomalies_artifacts = ev_output[standard_component_specs.ANOMALIES_KEY]
INFO:absl:MetadataStore with DB connection initialized

Przykładowe anomalie z ExampleValidator można również wizualizować.


Powinieneś zobaczyć „Nie znaleziono anomalii” dla każdego fragmentu przykładów. Ponieważ użyliśmy tych samych danych, które były używane do generowania schematu w tym potoku, nie oczekuje się tutaj żadnej anomalii. Jeśli ten potok zostanie uruchomiony wielokrotnie z nowymi danymi przychodzącymi, ExampleValidator powinien być w stanie znaleźć wszelkie rozbieżności między nowymi danymi a istniejącym schematem.

Jeśli zostaną znalezione jakiekolwiek anomalie, możesz przejrzeć swoje dane, aby sprawdzić, czy jakiekolwiek przykłady nie są zgodne z Twoimi założeniami. Przydatne mogą być dane wyjściowe z innych komponentów, takich jak StatisticsGen. Jednak wszelkie znalezione anomalie NIE będą blokować dalszych wykonań potoku.

Następne kroki

Można znaleźć więcej środków na https://www.tensorflow.org/tfx/tutorials

Proszę zobaczyć Zrozumienie TFX Rurociągi , aby dowiedzieć się więcej na temat różnych koncepcji w TFX.