Unisciti alla comunità SIG TFX-Addons e contribuisci a rendere TFX ancora migliore!
Questa pagina è stata tradotta dall'API Cloud Translation.
Switch to English

Convalida dei dati utilizzando TFX Pipeline e TensorFlow Data Validation

In questo tutorial basato su notebook, creeremo ed eseguiremo pipeline TFX per convalidare i dati di input e creare un modello ML. Questo notebook è basato sulla pipeline TFX che abbiamo creato in Simple TFX Pipeline Tutorial . Se non hai ancora letto quel tutorial, dovresti leggerlo prima di procedere con questo taccuino.

La prima attività in qualsiasi progetto di data science o ML è comprendere e pulire i dati, che include:

  • Comprendere i tipi di dati, le distribuzioni e altre informazioni (ad esempio, valore medio o numero di utenti unici) su ciascuna caratteristica
  • Generazione di uno schema preliminare che descrive i dati
  • Identificazione di anomalie e valori mancanti nei dati rispetto allo schema dato

In questo tutorial creeremo due pipeline TFX.

Innanzitutto, creeremo una pipeline per analizzare il set di dati e generare uno schema preliminare del set di dati specificato. Questa pipeline includerà due nuovi componenti, StatisticsGen e SchemaGen .

Una volta che avremo uno schema appropriato dei dati, creeremo una pipeline per addestrare un modello di classificazione ML basato sulla pipeline del tutorial precedente. In questa pipeline, useremo lo schema della prima pipeline e un nuovo componente, ExampleValidator , per convalidare i dati di input.

I tre nuovi componenti, StatisticsGen, SchemaGen ed ExampleValidator, sono componenti TFX per l'analisi e la convalida dei dati e vengono implementati utilizzando la libreria TensorFlow Data Validation .

Vedere Comprensione delle pipeline TFX per saperne di più sui vari concetti in TFX.

Impostare

Per prima cosa dobbiamo installare il pacchetto TFX Python e scaricare il set di dati che useremo per il nostro modello.

Aggiorna Pip

Per evitare di aggiornare Pip in un sistema quando è in esecuzione localmente, verificare che sia in esecuzione in Colab. I sistemi locali possono ovviamente essere aggiornati separatamente.

try:
  import colab
  !pip install -q --upgrade pip
except:
  pass

Installa TFX

# TODO(b/178712706): Stop using legacy resolver after PIP issue is resolved.
pip install -q -U --use-deprecated=legacy-resolver tfx==0.27.0

Hai riavviato il runtime?

Se stai utilizzando Google Colab, la prima volta che esegui la cella sopra, devi riavviare il runtime facendo clic sul pulsante "RESTART RUNTIME" sopra o utilizzando il menu "Runtime> Restart runtime ...". Ciò è dovuto al modo in cui Colab carica i pacchetti.

Controlla le versioni TensorFlow e TFX.

import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
import tfx
print('TFX version: {}'.format(tfx.__version__))
TensorFlow version: 2.4.1
TFX version: 0.27.0

Imposta le variabili

Esistono alcune variabili utilizzate per definire una pipeline. Puoi personalizzare queste variabili come desideri. Per impostazione predefinita, tutto l'output dalla pipeline verrà generato nella directory corrente.

import os

# We will create two pipelines. One for schema generation and one for training.
SCHEMA_PIPELINE_NAME = "penguin-tfdv-schema"
PIPELINE_NAME = "penguin-tfdv"

# Output directory to store artifacts generated from the pipeline.
SCHEMA_PIPELINE_ROOT = os.path.join('pipelines', SCHEMA_PIPELINE_NAME)
PIPELINE_ROOT = os.path.join('pipelines', PIPELINE_NAME)
# Path to a SQLite DB file to use as an MLMD storage.
SCHEMA_METADATA_PATH = os.path.join('metadata', SCHEMA_PIPELINE_NAME,
                                    'metadata.db')
METADATA_PATH = os.path.join('metadata', PIPELINE_NAME, 'metadata.db')

# Output directory where created models from the pipeline will be exported.
SERVING_MODEL_DIR = os.path.join('serving_model', PIPELINE_NAME)

from absl import logging
logging.set_verbosity(logging.INFO)  # Set default logging level.

Prepara i dati di esempio

Scaricheremo il set di dati di esempio da utilizzare nella nostra pipeline TFX. Il set di dati che stiamo utilizzando è il set di dati Palmer Penguins che viene utilizzato anche in altri esempi TFX .

Ci sono quattro caratteristiche numeriche in questo set di dati:

  • culmen_length_mm
  • culmen_depth_mm
  • flipper_length_mm
  • body_mass_g

Tutte le caratteristiche erano già state normalizzate per avere un intervallo [0,1]. Costruiremo un modello di classificazione che predice le species di pinguini.

Poiché il componente TFX ExampleGen legge gli input da una directory, è necessario creare una directory e copiarvi il set di dati.

import urllib.request
import tempfile

DATA_ROOT = tempfile.mkdtemp(prefix='tfx-data')  # Create a temporary directory.
_data_url = 'https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/penguin/data/penguins_processed.csv'
_data_filepath = os.path.join(DATA_ROOT, "data.csv")
urllib.request.urlretrieve(_data_url, _data_filepath)
('/tmp/tfx-datacjdu22yh/data.csv', <http.client.HTTPMessage at 0x7f93259774a8>)

Dai una rapida occhiata al file CSV.

head {_data_filepath}
species,culmen_length_mm,culmen_depth_mm,flipper_length_mm,body_mass_g
0,0.2545454545454545,0.6666666666666666,0.15254237288135594,0.2916666666666667
0,0.26909090909090905,0.5119047619047618,0.23728813559322035,0.3055555555555556
0,0.29818181818181805,0.5833333333333334,0.3898305084745763,0.1527777777777778
0,0.16727272727272732,0.7380952380952381,0.3559322033898305,0.20833333333333334
0,0.26181818181818167,0.892857142857143,0.3050847457627119,0.2638888888888889
0,0.24727272727272717,0.5595238095238096,0.15254237288135594,0.2569444444444444
0,0.25818181818181823,0.773809523809524,0.3898305084745763,0.5486111111111112
0,0.32727272727272727,0.5357142857142859,0.1694915254237288,0.1388888888888889
0,0.23636363636363636,0.9642857142857142,0.3220338983050847,0.3055555555555556

Dovresti essere in grado di vedere cinque colonne di funzionalità. species è uno tra 0, 1 o 2 e tutte le altre caratteristiche dovrebbero avere valori compresi tra 0 e 1. Creeremo una pipeline TFX per analizzare questo set di dati.

Genera uno schema preliminare

Le pipeline TFX vengono definite utilizzando le API Python. Creeremo una pipeline per generare automaticamente uno schema dagli esempi di input. Questo schema può essere rivisto da un essere umano e adattato secondo necessità. Una volta finalizzato, lo schema può essere utilizzato per l'addestramento e la convalida di esempi nelle attività successive.

Oltre a CsvExampleGen che viene utilizzato in Simple TFX Pipeline Tutorial , useremo StatisticsGen e SchemaGen :

  • StatisticsGen calcola le statistiche per il set di dati.
  • SchemaGen esamina le statistiche e crea uno schema di dati iniziale.

Consulta le guide per ogni componente o il tutorial sui componenti TFX per saperne di più su questi componenti.

Scrivi una definizione della pipeline

Definiamo una funzione per creare una pipeline TFX. Un oggetto Pipeline rappresenta una pipeline TFX che può essere eseguita utilizzando uno dei sistemi di orchestrazione della pipeline supportati da TFX.

from tfx.components import CsvExampleGen
from tfx.components import SchemaGen
from tfx.components import StatisticsGen
from tfx.orchestration import metadata
from tfx.orchestration import pipeline

def _create_schema_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
                     metadata_path: str) -> pipeline.Pipeline:
  """Creates a pipeline for schema generation."""
  # Brings data into the pipeline.
  example_gen = CsvExampleGen(input_base=data_root)

  # NEW: Computes statistics over data for visualization and schema generation.
  statistics_gen = StatisticsGen(examples=example_gen.outputs['examples'])

  # NEW: Generates schema based on the generated statistics.
  schema_gen = SchemaGen(
      statistics=statistics_gen.outputs['statistics'], infer_feature_shape=True)

  components = [
      example_gen,
      statistics_gen,
      schema_gen,
  ]

  return pipeline.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      metadata_connection_config=metadata.sqlite_metadata_connection_config(
          metadata_path),
      components=components)
INFO:absl:tensorflow_ranking is not available: No module named 'tensorflow_ranking'
INFO:absl:tensorflow_text is not available: No module named 'tensorflow_text'
WARNING:absl:RuntimeParameter is only supported on Cloud-based DAG runner currently.

Esegui la pipeline

Useremo LocalDagRunner come nel tutorial precedente.

import os
from tfx.orchestration.local import local_dag_runner

local_dag_runner.LocalDagRunner().run(
  _create_schema_pipeline(
      pipeline_name=SCHEMA_PIPELINE_NAME,
      pipeline_root=SCHEMA_PIPELINE_ROOT,
      data_root=DATA_ROOT,
      metadata_path=SCHEMA_METADATA_PATH))
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Component CsvExampleGen is running.
INFO:absl:Running driver for CsvExampleGen
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:select span and version = (0, None)
INFO:absl:latest span and version = (0, None)
INFO:absl:Running executor for CsvExampleGen
INFO:absl:Generating examples.
WARNING:apache_beam.runners.interactive.interactive_environment:Dependencies required for Interactive Beam PCollection visualization are not available, please use: `pip install apache-beam[interactive]` to install necessary dependencies to enable all data visualization features.
INFO:absl:Processing input csv data /tmp/tfx-datacjdu22yh/* to TFExample.
WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be.
INFO:absl:Examples generated.
INFO:absl:Running publisher for CsvExampleGen
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component CsvExampleGen is finished.
INFO:absl:Component StatisticsGen is running.
INFO:absl:Running driver for StatisticsGen
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Running executor for StatisticsGen
INFO:absl:Generating statistics for split train.
INFO:absl:Statistics for split train written to pipelines/penguin-tfdv-schema/StatisticsGen/statistics/2/train.
INFO:absl:Generating statistics for split eval.
INFO:absl:Statistics for split eval written to pipelines/penguin-tfdv-schema/StatisticsGen/statistics/2/eval.
INFO:absl:Running publisher for StatisticsGen
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component StatisticsGen is finished.
INFO:absl:Component SchemaGen is running.
INFO:absl:Running driver for SchemaGen
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Running executor for SchemaGen
INFO:absl:Processing schema from statistics for split train.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow_data_validation/utils/stats_util.py:247: tf_record_iterator (from tensorflow.python.lib.io.tf_record) is deprecated and will be removed in a future version.
Instructions for updating:
Use eager execution and: 
`tf.data.TFRecordDataset(path)`
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow_data_validation/utils/stats_util.py:247: tf_record_iterator (from tensorflow.python.lib.io.tf_record) is deprecated and will be removed in a future version.
Instructions for updating:
Use eager execution and: 
`tf.data.TFRecordDataset(path)`
INFO:absl:Processing schema from statistics for split eval.
INFO:absl:Schema written to pipelines/penguin-tfdv-schema/SchemaGen/schema/3/schema.pbtxt.
INFO:absl:Running publisher for SchemaGen
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component SchemaGen is finished.

Dovresti vedere "INFO: absl: Component SchemaGen è finito". se la pipeline è terminata correttamente.

Esamineremo l'output della pipeline per comprendere il nostro set di dati.

Esamina i risultati della pipeline

Come spiegato nel tutorial precedente, una pipeline TFX produce due tipi di output, artefatti e un database di metadati (MLMD) che contiene metadati di artefatti ed esecuzioni di pipeline. Abbiamo definito la posizione di questi output nelle celle sopra. Per impostazione predefinita, gli artefatti vengono archiviati nella directory delle pipelines ei metadati vengono archiviati come database sqlite nella directory dei metadata .

È possibile utilizzare le API MLMD per individuare questi output in modo programmatico. Per prima cosa, definiremo alcune funzioni di utilità per cercare gli artefatti di output che sono stati appena prodotti.

from ml_metadata.proto import metadata_store_pb2
from tfx.orchestration.portable.mlmd import execution_lib
from tfx.types import standard_component_specs

# TODO(b/171447278): Move these functions into the TFX library.

def get_latest_executions(metadata, pipeline_name):
  """Gets the execution objects for the latest run of the pipeline."""
  run_contexts = [
        c for c in metadata.store.get_contexts_by_type('run')
        if c.properties['pipeline_name'].string_value == pipeline_name
    ]
  if not run_contexts:
    return []
  # Pick the latest run context.
  latest_context = max(run_contexts,
                       key=lambda c: c.last_update_time_since_epoch)
  return metadata.store.get_executions_by_context(latest_context.id)

def get_artifacts_for_component_id(metadata, executions, component_id):
  for execution in executions:
    if execution.properties['component_id'].string_value == component_id:
      return execution_lib.get_artifacts_dict(metadata, execution.id,
                                              metadata_store_pb2.Event.OUTPUT)
  raise RuntimeError(f'Execution not found for {component_id}')

from tfx.orchestration.experimental.interactive import visualizations

def visualize_artifacts(artifacts):
  """Visualizes artifacts using standard visualization modules."""
  for artifact in artifacts:
    visualization = visualizations.get_registry().get_visualization(
        artifact.type_name)
    if visualization:
      visualization.display(artifact)

from tfx.orchestration.experimental.interactive import standard_visualizations
standard_visualizations.register_standard_visualizations()

from tfx.types import standard_artifacts

Ora possiamo esaminare gli output dall'esecuzione della pipeline.

metadata_connection_config = metadata.sqlite_metadata_connection_config(
              SCHEMA_METADATA_PATH)

with metadata.Metadata(metadata_connection_config) as metadata_handler:
  # Search component executions from the previous pipeline run.
  executions = get_latest_executions(metadata_handler, SCHEMA_PIPELINE_NAME)

  # Find output artifacts from MLMD.
  stat_gen_output = get_artifacts_for_component_id(metadata_handler, executions,
                                                   'StatisticsGen')
  stats_artifacts = stat_gen_output[standard_component_specs.STATISTICS_KEY]

  schema_gen_output = get_artifacts_for_component_id(metadata_handler,
                                                     executions, 'SchemaGen')
  schema_artifacts = schema_gen_output[standard_component_specs.SCHEMA_KEY]
INFO:absl:MetadataStore with DB connection initialized

È il momento di esaminare i risultati di ogni componente. Come descritto sopra, Tensorflow Data Validation (TFDV) viene utilizzato in StatisticsGen e SchemaGen e TFDV fornisce anche la visualizzazione degli output di questi componenti.

In questo tutorial, utilizzeremo i metodi di supporto della visualizzazione in TFX che utilizzano TFDV internamente per mostrare la visualizzazione.

Esamina l'output di StatisticsGen

visualize_artifacts(stats_artifacts)

È possibile visualizzare varie statistiche per i dati di input. Queste statistiche vengono fornite a SchemaGen per costruire automaticamente uno schema iniziale di dati.

Esamina l'output di SchemaGen

visualize_artifacts(schema_artifacts)

Questo schema viene dedotto automaticamente dall'output di StatisticsGen. Dovresti essere in grado di vedere 4 funzioni FLOAT e 1 funzione INT.

Esporta lo schema per un utilizzo futuro

Dobbiamo rivedere e perfezionare lo schema generato. Lo schema revisionato deve essere mantenuto per essere utilizzato nelle pipeline successive per l'addestramento del modello ML. In altre parole, potresti voler aggiungere il file dello schema al tuo sistema di controllo della versione per casi d'uso reali. In questo tutorial, copieremo semplicemente lo schema in un percorso di file system predefinito per semplicità.

import shutil

_schema_filename = 'schema.pbtxt'
SCHEMA_PATH = 'schema'

os.makedirs(SCHEMA_PATH, exist_ok=True)
_generated_path = os.path.join(schema_artifacts[0].uri, _schema_filename)

# Copy the 'schema.pbtxt' file from the artifact uri to a predefined path.
shutil.copy(_generated_path, SCHEMA_PATH)
'schema/schema.pbtxt'

Il file dello schema utilizza il formato di testo Protocol Buffer e un'istanza del protocollo TensorFlow Metadata Schema .

print(f'Schema at {SCHEMA_PATH}-----')
!cat {SCHEMA_PATH}/*
Schema at schema-----
feature {
  name: "body_mass_g"
  type: FLOAT
  presence {
    min_fraction: 1.0
    min_count: 1
  }
  shape {
    dim {
      size: 1
    }
  }
}
feature {
  name: "culmen_depth_mm"
  type: FLOAT
  presence {
    min_fraction: 1.0
    min_count: 1
  }
  shape {
    dim {
      size: 1
    }
  }
}
feature {
  name: "culmen_length_mm"
  type: FLOAT
  presence {
    min_fraction: 1.0
    min_count: 1
  }
  shape {
    dim {
      size: 1
    }
  }
}
feature {
  name: "flipper_length_mm"
  type: FLOAT
  presence {
    min_fraction: 1.0
    min_count: 1
  }
  shape {
    dim {
      size: 1
    }
  }
}
feature {
  name: "species"
  type: INT
  presence {
    min_fraction: 1.0
    min_count: 1
  }
  shape {
    dim {
      size: 1
    }
  }
}

Assicurati di rivedere ed eventualmente modificare la definizione dello schema secondo necessità. In questo tutorial, useremo semplicemente lo schema generato invariato.

Convalida esempi di input e addestra un modello ML

Torneremo alla pipeline che abbiamo creato in Simple TFX Pipeline Tutorial , per addestrare un modello ML e utilizzare lo schema generato per scrivere il codice di addestramento del modello.

Aggiungeremo anche un componente ExampleValidator che cercherà anomalie e valori mancanti nel set di dati in arrivo rispetto allo schema.

Scrivi il codice di addestramento del modello

Dobbiamo scrivere il codice del modello come abbiamo fatto in Simple TFX Pipeline Tutorial .

Il modello stesso è lo stesso del tutorial precedente, ma questa volta useremo lo schema generato dalla pipeline precedente invece di specificare le caratteristiche manualmente. La maggior parte del codice non è stata modificata. L'unica differenza è che non è necessario specificare i nomi e i tipi di funzionalità in questo file. Invece, li leggiamo dal file dello schema .

_trainer_module_file = 'penguin_trainer.py'
%%writefile {_trainer_module_file}

from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils

from tfx.components.trainer.executor import TrainerFnArgs
from tfx.components.trainer.fn_args_utils import DataAccessor
from tfx.utils import io_utils
from tfx_bsl.tfxio import dataset_options
from tensorflow_metadata.proto.v0 import schema_pb2

# We don't need to specify _FEATURE_KEYS and _FEATURE_SPEC any more.
# Those information can be read from the given schema file.

_LABEL_KEY = 'species'

_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10

def _input_fn(file_pattern: List[str],
              data_accessor: DataAccessor,
              schema: schema_pb2.Schema,
              batch_size: int = 200) -> tf.data.Dataset:
  """Generates features and label for training.

  Args:
    file_pattern: List of paths or patterns of input tfrecord files.
    data_accessor: DataAccessor for converting input to RecordBatch.
    schema: schema of the input data.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

  Returns:
    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  """
  return data_accessor.tf_dataset_factory(
      file_pattern,
      dataset_options.TensorFlowDatasetOptions(
          batch_size=batch_size, label_key=_LABEL_KEY),
      schema=schema).repeat()


def _build_keras_model(schema: schema_pb2.Schema) -> tf.keras.Model:
  """Creates a DNN Keras model for classifying penguin data.

  Returns:
    A Keras Model.
  """
  # The model below is built with Functional API, please refer to
  # https://www.tensorflow.org/guide/keras/overview for all API options.

  # ++ Changed code: Uses all features in the schema except the label.
  feature_keys = [f.name for f in schema.feature if f.name != _LABEL_KEY]
  inputs = [keras.layers.Input(shape=(1,), name=f) for f in feature_keys]
  # ++ End of the changed code.

  d = keras.layers.concatenate(inputs)
  for _ in range(2):
    d = keras.layers.Dense(8, activation='relu')(d)
  outputs = keras.layers.Dense(3)(d)

  model = keras.Model(inputs=inputs, outputs=outputs)
  model.compile(
      optimizer=keras.optimizers.Adam(1e-2),
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      metrics=[keras.metrics.SparseCategoricalAccuracy()])

  model.summary(print_fn=logging.info)
  return model


# TFX Trainer will call this function.
def run_fn(fn_args: TrainerFnArgs):
  """Train the model based on given args.

  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """

  # ++ Changed code: Reads in schema file passed to the Trainer component.
  schema = io_utils.parse_pbtxt_file(fn_args.schema_path, schema_pb2.Schema())
  # ++ End of the changed code.

  train_dataset = _input_fn(
      fn_args.train_files,
      fn_args.data_accessor,
      schema,
      batch_size=_TRAIN_BATCH_SIZE)
  eval_dataset = _input_fn(
      fn_args.eval_files,
      fn_args.data_accessor,
      schema,
      batch_size=_EVAL_BATCH_SIZE)

  model = _build_keras_model(schema)
  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps)

  # The result of the training should be saved in `fn_args.serving_model_dir`
  # directory.
  model.save(fn_args.serving_model_dir, save_format='tf')
Writing penguin_trainer.py

Ora hai completato tutti i passaggi di preparazione per creare una pipeline TFX per l'addestramento del modello.

Scrivi una definizione della pipeline

Aggiungeremo due nuovi componenti, Importer e ExampleValidator . L'importatore porta un file esterno nella pipeline TFX. In questo caso, è un file contenente la definizione dello schema. ExampleValidator esaminerà i dati di input e convaliderà se tutti i dati di input sono conformi allo schema di dati che abbiamo fornito.

from tfx.components import CsvExampleGen
from tfx.components import ExampleValidator
from tfx.components import Pusher
from tfx.components import StatisticsGen
from tfx.components import Trainer
from tfx.components.trainer.executor import GenericExecutor
from tfx.dsl.components.base import executor_spec
from tfx.dsl.components.common.importer import Importer
from tfx.orchestration import metadata
from tfx.orchestration import pipeline
from tfx.proto import pusher_pb2
from tfx.proto import trainer_pb2
from tfx.types import standard_artifacts


def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
                     schema_path: str, module_file: str, serving_model_dir: str,
                     metadata_path: str) -> pipeline.Pipeline:
  """Creates a pipeline using predefined schema with TFX."""
  # Brings data into the pipeline.
  example_gen = CsvExampleGen(input_base=data_root)

  # Computes statistics over data for visualization and example validation.
  statistics_gen = StatisticsGen(examples=example_gen.outputs['examples'])

  # NEW: Import the schema.
  schema_importer = Importer(
      instance_name='import_schema',
      source_uri=schema_path,
      artifact_type=standard_artifacts.Schema)

  # NEW: Performs anomaly detection based on statistics and data schema.
  example_validator = ExampleValidator(
      statistics=statistics_gen.outputs['statistics'],
      schema=schema_importer.outputs['result'])

  # Uses user-provided Python function that trains a model.
  trainer = Trainer(
      module_file=module_file,
      custom_executor_spec=executor_spec.ExecutorClassSpec(GenericExecutor),
      examples=example_gen.outputs['examples'],
      schema=schema_importer.outputs['result'],  # Pass the imported schema.
      train_args=trainer_pb2.TrainArgs(num_steps=100),
      eval_args=trainer_pb2.EvalArgs(num_steps=5))

  # Pushes the model to a filesystem destination.
  pusher = Pusher(
      model=trainer.outputs['model'],
      push_destination=pusher_pb2.PushDestination(
          filesystem=pusher_pb2.PushDestination.Filesystem(
              base_directory=serving_model_dir)))

  components = [
      example_gen,

      # NEW: Following three components were added to the pipeline.
      statistics_gen,
      schema_importer,
      example_validator,

      trainer,
      pusher,
  ]

  return pipeline.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      metadata_connection_config=metadata.sqlite_metadata_connection_config(
          metadata_path),
      components=components)

Esegui la pipeline

import os
from tfx.orchestration.local import local_dag_runner

local_dag_runner.LocalDagRunner().run(
  _create_pipeline(
      pipeline_name=PIPELINE_NAME,
      pipeline_root=PIPELINE_ROOT,
      data_root=DATA_ROOT,
      schema_path=SCHEMA_PATH,
      module_file=_trainer_module_file,
      serving_model_dir=SERVING_MODEL_DIR,
      metadata_path=METADATA_PATH))
INFO:absl:Excluding no splits because exclude_splits is not set.
WARNING:absl:`instance_name` is deprecated, please set the node id directly using `with_id()` or the `.id` setter.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Component CsvExampleGen is running.
INFO:absl:Running driver for CsvExampleGen
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:select span and version = (0, None)
INFO:absl:latest span and version = (0, None)
INFO:absl:Running executor for CsvExampleGen
INFO:absl:Generating examples.
INFO:absl:Processing input csv data /tmp/tfx-datacjdu22yh/* to TFExample.
INFO:absl:Examples generated.
INFO:absl:Running publisher for CsvExampleGen
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component CsvExampleGen is finished.
INFO:absl:Component Importer.import_schema is running.
INFO:absl:Running driver for Importer.import_schema
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Processing source uri: schema, properties: {}, custom_properties: {}
INFO:absl:Running executor for Importer.import_schema
INFO:absl:Running publisher for Importer.import_schema
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component Importer.import_schema is finished.
INFO:absl:Component StatisticsGen is running.
INFO:absl:Running driver for StatisticsGen
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Running executor for StatisticsGen
INFO:absl:Generating statistics for split train.
INFO:absl:Statistics for split train written to pipelines/penguin-tfdv/StatisticsGen/statistics/3/train.
INFO:absl:Generating statistics for split eval.
INFO:absl:Statistics for split eval written to pipelines/penguin-tfdv/StatisticsGen/statistics/3/eval.
INFO:absl:Running publisher for StatisticsGen
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component StatisticsGen is finished.
INFO:absl:Component Trainer is running.
INFO:absl:Running driver for Trainer
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Running executor for Trainer
INFO:absl:Train on the 'train' split when train_args.splits is not set.
INFO:absl:Evaluate on the 'eval' split when eval_args.splits is not set.
INFO:absl:Loading penguin_trainer.py because it has not been loaded before.
INFO:absl:Training model.
INFO:absl:Feature body_mass_g has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature culmen_depth_mm has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature culmen_length_mm has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature flipper_length_mm has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature species has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature body_mass_g has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature culmen_depth_mm has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature culmen_length_mm has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature flipper_length_mm has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature species has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature body_mass_g has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature culmen_depth_mm has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature culmen_length_mm has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature flipper_length_mm has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature species has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature body_mass_g has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature culmen_depth_mm has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature culmen_length_mm has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature flipper_length_mm has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature species has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Model: "model"
INFO:absl:__________________________________________________________________________________________________
INFO:absl:Layer (type)                    Output Shape         Param #     Connected to                     
INFO:absl:==================================================================================================
INFO:absl:body_mass_g (InputLayer)        [(None, 1)]          0                                            
INFO:absl:__________________________________________________________________________________________________
INFO:absl:culmen_depth_mm (InputLayer)    [(None, 1)]          0                                            
INFO:absl:__________________________________________________________________________________________________
INFO:absl:culmen_length_mm (InputLayer)   [(None, 1)]          0                                            
INFO:absl:__________________________________________________________________________________________________
INFO:absl:flipper_length_mm (InputLayer)  [(None, 1)]          0                                            
INFO:absl:__________________________________________________________________________________________________
INFO:absl:concatenate (Concatenate)       (None, 4)            0           body_mass_g[0][0]                
INFO:absl:                                                                 culmen_depth_mm[0][0]            
INFO:absl:                                                                 culmen_length_mm[0][0]           
INFO:absl:                                                                 flipper_length_mm[0][0]          
INFO:absl:__________________________________________________________________________________________________
INFO:absl:dense (Dense)                   (None, 8)            40          concatenate[0][0]                
INFO:absl:__________________________________________________________________________________________________
INFO:absl:dense_1 (Dense)                 (None, 8)            72          dense[0][0]                      
INFO:absl:__________________________________________________________________________________________________
INFO:absl:dense_2 (Dense)                 (None, 3)            27          dense_1[0][0]                    
INFO:absl:==================================================================================================
INFO:absl:Total params: 139
INFO:absl:Trainable params: 139
INFO:absl:Non-trainable params: 0
INFO:absl:__________________________________________________________________________________________________
100/100 [==============================] - 2s 9ms/step - loss: 0.7374 - sparse_categorical_accuracy: 0.7421 - val_loss: 0.2698 - val_sparse_categorical_accuracy: 0.9600
INFO:tensorflow:Assets written to: pipelines/penguin-tfdv/Trainer/model/4/serving_model_dir/assets
INFO:tensorflow:Assets written to: pipelines/penguin-tfdv/Trainer/model/4/serving_model_dir/assets
INFO:absl:Training complete. Model written to pipelines/penguin-tfdv/Trainer/model/4/serving_model_dir. ModelRun written to pipelines/penguin-tfdv/Trainer/model_run/4
INFO:absl:Running publisher for Trainer
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component Trainer is finished.
INFO:absl:Component ExampleValidator is running.
INFO:absl:Running driver for ExampleValidator
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Running executor for ExampleValidator
INFO:absl:Validating schema against the computed statistics for split train.
INFO:absl:Validation complete for split train. Anomalies written to pipelines/penguin-tfdv/ExampleValidator/anomalies/5/train.
INFO:absl:Validating schema against the computed statistics for split eval.
INFO:absl:Validation complete for split eval. Anomalies written to pipelines/penguin-tfdv/ExampleValidator/anomalies/5/eval.
INFO:absl:Running publisher for ExampleValidator
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component ExampleValidator is finished.
INFO:absl:Component Pusher is running.
INFO:absl:Running driver for Pusher
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Running executor for Pusher
WARNING:absl:Pusher is going to push the model without validation. Consider using Evaluator or InfraValidator in your pipeline.
INFO:absl:Model version: 1619774279
INFO:absl:Model written to serving path serving_model/penguin-tfdv/1619774279.
INFO:absl:Model pushed to pipelines/penguin-tfdv/Pusher/pushed_model/6.
INFO:absl:Running publisher for Pusher
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component Pusher is finished.

Dovresti vedere "INFO: absl: Component Pusher è finito." se la pipeline è terminata correttamente.

Esaminare gli output della pipeline

Abbiamo addestrato il modello di classificazione per i pinguini e abbiamo anche convalidato gli esempi di input nel componente ExampleValidator. Possiamo analizzare l'output di ExampleValidator come abbiamo fatto con la pipeline precedente.

metadata_connection_config = metadata.sqlite_metadata_connection_config(
              METADATA_PATH)

with metadata.Metadata(metadata_connection_config) as metadata_handler:
  # Search component executions from the previous pipeline run.
  executions = get_latest_executions(metadata_handler, PIPELINE_NAME)

  # Find output artifacts from MLMD.
  stat_gen_output = get_artifacts_for_component_id(metadata_handler, executions,
                                                   'ExampleValidator')
  anomalies_artifacts = stat_gen_output[standard_component_specs.ANOMALIES_KEY]
INFO:absl:MetadataStore with DB connection initialized

ExampleAnomalies da ExampleValidator possono anche essere visualizzate.

visualize_artifacts(anomalies_artifacts)

Dovresti vedere "Nessuna anomalia trovata" per ogni suddivisione di esempi. Poiché abbiamo utilizzato gli stessi dati utilizzati per la generazione dello schema in questa pipeline, non è prevista alcuna anomalia qui. Se esegui ripetutamente questa pipeline con nuovi dati in arrivo, ExampleValidator dovrebbe essere in grado di trovare eventuali discrepanze tra i nuovi dati e lo schema esistente.

Se sono state rilevate anomalie, puoi rivedere i tuoi dati per verificare se alcuni esempi non seguono le tue ipotesi. Gli output di altri componenti come StatisticsGen potrebbero essere utili. Tuttavia, qualsiasi anomalia rilevata NON bloccherà ulteriori esecuzioni della pipeline.

Prossimi passi

Puoi trovare ulteriori risorse su https://www.tensorflow.org/tfx/tutorials

Vedere Comprensione delle pipeline TFX per saperne di più sui vari concetti in TFX.