Unisciti alla comunità SIG TFX-Addons e contribuisci a rendere TFX ancora migliore!
Questa pagina è stata tradotta dall'API Cloud Translation.
Switch to English

Tutorial semplice sulla pipeline TFX utilizzando il set di dati Penguin

Un breve tutorial per eseguire una semplice pipeline TFX.

In questo tutorial basato su notebook, creeremo ed eseguiremo una pipeline TFX per un semplice modello di classificazione. La pipeline sarà composta da tre componenti TFX essenziali: ExampleGen, Trainer e Pusher. La pipeline include il flusso di lavoro ML minimo come l'importazione di dati, l'addestramento di un modello e l'esportazione del modello addestrato.

Vedere Comprensione delle pipeline TFX per saperne di più sui vari concetti in TFX.

Impostare

Per prima cosa dobbiamo installare il pacchetto TFX Python e scaricare il set di dati che useremo per il nostro modello.

Aggiorna Pip

Per evitare di aggiornare Pip in un sistema quando è in esecuzione localmente, verificare che sia in esecuzione in Colab. I sistemi locali possono ovviamente essere aggiornati separatamente.

try:
  import colab
  !pip install -q --upgrade pip
except:
  pass

Installa TFX

# TODO(b/178712706): Stop using legacy resolver after PIP issue is resolved.
pip install -q -U --use-deprecated=legacy-resolver tfx

Hai riavviato il runtime?

Se stai utilizzando Google Colab, la prima volta che esegui la cella sopra, devi riavviare il runtime facendo clic sopra il pulsante "RESTART RUNTIME" o utilizzando il menu "Runtime> Restart runtime ...". Ciò è dovuto al modo in cui Colab carica i pacchetti.

Controlla le versioni TensorFlow e TFX.

import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
import tfx
print('TFX version: {}'.format(tfx.__version__))
TensorFlow version: 2.4.1
TFX version: 0.27.0

Imposta le variabili

Esistono alcune variabili utilizzate per definire una pipeline. Puoi personalizzare queste variabili come desideri. Per impostazione predefinita, tutto l'output dalla pipeline verrà generato nella directory corrente.

import os

PIPELINE_NAME = "penguin-simple"

# Output directory to store artifacts generated from the pipeline.
PIPELINE_ROOT = os.path.join('pipelines', PIPELINE_NAME)
# Path to a SQLite DB file to use as an MLMD storage.
METADATA_PATH = os.path.join('metadata', PIPELINE_NAME, 'metadata.db')
# Output directory where created models from the pipeline will be exported.
SERVING_MODEL_DIR = os.path.join('serving_model', PIPELINE_NAME)

from absl import logging
logging.set_verbosity(logging.INFO)  # Set default logging level.

Prepara i dati di esempio

Scaricheremo il set di dati di esempio da utilizzare nella nostra pipeline TFX. Il set di dati che stiamo utilizzando è il set di dati Palmer Penguins che viene utilizzato anche in altri esempi TFX .

Ci sono quattro caratteristiche numeriche in questo set di dati:

  • culmen_length_mm
  • culmen_depth_mm
  • flipper_length_mm
  • body_mass_g

Tutte le caratteristiche erano già state normalizzate per avere un intervallo [0,1]. Costruiremo un modello di classificazione che predice le species di pinguini.

Poiché TFX ExampleGen legge gli input da una directory, è necessario creare una directory e copiarvi il set di dati.

import urllib.request
import tempfile

DATA_ROOT = tempfile.mkdtemp(prefix='tfx-data')  # Create a temporary directory.
_data_url = 'https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/penguin/data/penguins_processed.csv'
_data_filepath = os.path.join(DATA_ROOT, "data.csv")
urllib.request.urlretrieve(_data_url, _data_filepath)
('/tmp/tfx-data8oul1c7e/data.csv', <http.client.HTTPMessage at 0x7f20f9900208>)

Dai una rapida occhiata al file CSV.

head {_data_filepath}
species,culmen_length_mm,culmen_depth_mm,flipper_length_mm,body_mass_g
0,0.2545454545454545,0.6666666666666666,0.15254237288135594,0.2916666666666667
0,0.26909090909090905,0.5119047619047618,0.23728813559322035,0.3055555555555556
0,0.29818181818181805,0.5833333333333334,0.3898305084745763,0.1527777777777778
0,0.16727272727272732,0.7380952380952381,0.3559322033898305,0.20833333333333334
0,0.26181818181818167,0.892857142857143,0.3050847457627119,0.2638888888888889
0,0.24727272727272717,0.5595238095238096,0.15254237288135594,0.2569444444444444
0,0.25818181818181823,0.773809523809524,0.3898305084745763,0.5486111111111112
0,0.32727272727272727,0.5357142857142859,0.1694915254237288,0.1388888888888889
0,0.23636363636363636,0.9642857142857142,0.3220338983050847,0.3055555555555556

Dovresti essere in grado di vedere cinque valori. species è uno tra 0, 1 o 2 e tutte le altre caratteristiche devono avere valori compresi tra 0 e 1.

Crea una pipeline

Le pipeline TFX vengono definite utilizzando le API Python. Definiremo una pipeline che consiste dei seguenti tre componenti.

  • CsvExampleGen: legge i file di dati e li converte nel formato interno TFX per ulteriori elaborazioni. Esistono più ExampleGen per vari formati. In questo tutorial, useremo CsvExampleGen che accetta l'input del file CSV.
  • Trainer: addestra un modello ML. Il componente Trainer richiede un codice di definizione del modello da parte degli utenti. Puoi utilizzare le API TensorFlow per specificare come addestrare un modello e salvarlo in un formato di modello _saved.
  • Pusher: copia il modello addestrato al di fuori della pipeline TFX. Il componente Pusher può essere pensato come un processo di distribuzione del modello ML addestrato.

Prima di definire effettivamente la pipeline, è necessario scrivere prima un codice modello per il componente Trainer.

Scrivi il codice di addestramento del modello

Creeremo un semplice modello DNN per la classificazione utilizzando TensorFlow Keras API. Questo codice di addestramento del modello verrà salvato in un file separato.

In questo tutorial useremo Generic Trainer di TFX che supporta i modelli basati su Keras. È necessario scrivere un file Python contenente la funzione run_fn , che è il punto di ingresso per il componente Trainer .

_trainer_module_file = 'penguin_trainer.py'
%%writefile {_trainer_module_file}

from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils

from tfx.components.trainer.executor import TrainerFnArgs
from tfx.components.trainer.fn_args_utils import DataAccessor
from tfx_bsl.tfxio import dataset_options
from tensorflow_metadata.proto.v0 import schema_pb2

_FEATURE_KEYS = [
    'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
]
_LABEL_KEY = 'species'

_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10

# Since we're not generating or creating a schema, we will instead create
# a feature spec.  Since there are a fairly small number of features this is
# manageable for this dataset.
_FEATURE_SPEC = {
    **{
        feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)
           for feature in _FEATURE_KEYS
       },
    _LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
}


def _input_fn(file_pattern: List[str],
              data_accessor: DataAccessor,
              schema: schema_pb2.Schema,
              batch_size: int = 200) -> tf.data.Dataset:
  """Generates features and label for training.

  Args:
    file_pattern: List of paths or patterns of input tfrecord files.
    data_accessor: DataAccessor for converting input to RecordBatch.
    schema: schema of the input data.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

  Returns:
    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  """
  return data_accessor.tf_dataset_factory(
      file_pattern,
      dataset_options.TensorFlowDatasetOptions(
          batch_size=batch_size, label_key=_LABEL_KEY),
      schema=schema).repeat()


def _build_keras_model() -> tf.keras.Model:
  """Creates a DNN Keras model for classifying penguin data.

  Returns:
    A Keras Model.
  """
  # The model below is built with Functional API, please refer to
  # https://www.tensorflow.org/guide/keras/overview for all API options.
  inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
  d = keras.layers.concatenate(inputs)
  for _ in range(2):
    d = keras.layers.Dense(8, activation='relu')(d)
  outputs = keras.layers.Dense(3)(d)

  model = keras.Model(inputs=inputs, outputs=outputs)
  model.compile(
      optimizer=keras.optimizers.Adam(1e-2),
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      metrics=[keras.metrics.SparseCategoricalAccuracy()])

  model.summary(print_fn=logging.info)
  return model


# TFX Trainer will call this function.
def run_fn(fn_args: TrainerFnArgs):
  """Train the model based on given args.

  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """

  # This schema is usually either an output of SchemaGen or a manually-curated
  # version provided by pipeline author. A schema can also derived from TFT
  # graph if a Transform component is used. In the case when either is missing,
  # `schema_from_feature_spec` could be used to generate schema from very simple
  # feature_spec, but the schema returned would be very primitive.
  schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)

  train_dataset = _input_fn(
      fn_args.train_files,
      fn_args.data_accessor,
      schema,
      batch_size=_TRAIN_BATCH_SIZE)
  eval_dataset = _input_fn(
      fn_args.eval_files,
      fn_args.data_accessor,
      schema,
      batch_size=_EVAL_BATCH_SIZE)

  model = _build_keras_model()
  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps)

  # The result of the training should be saved in `fn_args.serving_model_dir`
  # directory.
  model.save(fn_args.serving_model_dir, save_format='tf')
Writing penguin_trainer.py

Ora hai completato tutti i passaggi di preparazione per creare una pipeline TFX.

Scrivi una definizione della pipeline

Definiamo una funzione per creare una pipeline TFX. Un oggetto Pipeline rappresenta una pipeline TFX che può essere eseguita utilizzando uno dei sistemi di orchestrazione della pipeline supportati da TFX.

from tfx.components import CsvExampleGen
from tfx.components import Pusher
from tfx.components import Trainer
from tfx.components.trainer.executor import GenericExecutor
from tfx.dsl.components.base import executor_spec
from tfx.orchestration import metadata
from tfx.orchestration import pipeline
from tfx.proto import pusher_pb2
from tfx.proto import trainer_pb2

def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
                     module_file: str, serving_model_dir: str,
                     metadata_path: str) -> pipeline.Pipeline:
  """Creates a three component penguin pipeline with TFX."""
  # Brings data into the pipeline.
  example_gen = CsvExampleGen(input_base=data_root)

  # Uses user-provided Python function that trains a model.
  trainer = Trainer(
      module_file=module_file,
      custom_executor_spec=executor_spec.ExecutorClassSpec(GenericExecutor),
      examples=example_gen.outputs['examples'],
      train_args=trainer_pb2.TrainArgs(num_steps=100),
      eval_args=trainer_pb2.EvalArgs(num_steps=5))

  # Pushes the model to a filesystem destination.
  pusher = Pusher(
      model=trainer.outputs['model'],
      push_destination=pusher_pb2.PushDestination(
          filesystem=pusher_pb2.PushDestination.Filesystem(
              base_directory=serving_model_dir)))

  # Following three components will be included in the pipeline.
  components = [
      example_gen,
      trainer,
      pusher,
  ]

  return pipeline.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      metadata_connection_config=metadata.sqlite_metadata_connection_config(
          metadata_path),
      components=components)
INFO:absl:tensorflow_ranking is not available: No module named 'tensorflow_ranking'
INFO:absl:tensorflow_text is not available: No module named 'tensorflow_text'
WARNING:absl:RuntimeParameter is only supported on Cloud-based DAG runner currently.

Esegui la pipeline

TFX supporta più orchestratori per eseguire pipeline. In questo tutorial useremo LocalDagRunner che è incluso nel pacchetto TFX Python ed esegue pipeline in ambiente locale. Spesso chiamiamo le pipeline TFX "DAG" che sta per grafo aciclico diretto.

LocalDagRunner fornisce iterazioni veloci per lo sviluppo e il debug. TFX supporta anche altri orchestrator tra cui Kubeflow Pipelines e Apache Airflow che sono adatti per casi d'uso di produzione.

Vedi TFX on Cloud AI Platform Pipelines o TFX Airflow Tutorial per saperne di più su altri sistemi di orchestrazione.

Ora creiamo un LocalDagRunner e passiamo un oggetto Pipeline creato dalla funzione che abbiamo già definito.

La pipeline viene eseguita direttamente ed è possibile visualizzare i log per lo stato di avanzamento della pipeline, incluso l'addestramento del modello ML.

import os
from tfx.orchestration.local import local_dag_runner

local_dag_runner.LocalDagRunner().run(
  _create_pipeline(
      pipeline_name=PIPELINE_NAME,
      pipeline_root=PIPELINE_ROOT,
      data_root=DATA_ROOT,
      module_file=_trainer_module_file,
      serving_model_dir=SERVING_MODEL_DIR,
      metadata_path=METADATA_PATH))
INFO:absl:Component CsvExampleGen is running.
INFO:absl:Running driver for CsvExampleGen
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:select span and version = (0, None)
INFO:absl:latest span and version = (0, None)
INFO:absl:Running executor for CsvExampleGen
INFO:absl:Generating examples.
WARNING:apache_beam.runners.interactive.interactive_environment:Dependencies required for Interactive Beam PCollection visualization are not available, please use: `pip install apache-beam[interactive]` to install necessary dependencies to enable all data visualization features.
INFO:absl:Processing input csv data /tmp/tfx-data8oul1c7e/* to TFExample.
WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be.
INFO:absl:Examples generated.
INFO:absl:Running publisher for CsvExampleGen
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component CsvExampleGen is finished.
INFO:absl:Component Trainer is running.
INFO:absl:Running driver for Trainer
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Running executor for Trainer
INFO:absl:Train on the 'train' split when train_args.splits is not set.
INFO:absl:Evaluate on the 'eval' split when eval_args.splits is not set.
INFO:absl:Loading penguin_trainer.py because it has not been loaded before.
INFO:absl:Training model.
INFO:absl:Feature body_mass_g has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature culmen_depth_mm has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature culmen_length_mm has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature flipper_length_mm has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature species has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature body_mass_g has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature culmen_depth_mm has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature culmen_length_mm has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature flipper_length_mm has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature species has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature body_mass_g has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature culmen_depth_mm has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature culmen_length_mm has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature flipper_length_mm has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature species has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature body_mass_g has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature culmen_depth_mm has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature culmen_length_mm has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature flipper_length_mm has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature species has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Model: "model"
INFO:absl:__________________________________________________________________________________________________
INFO:absl:Layer (type)                    Output Shape         Param #     Connected to                     
INFO:absl:==================================================================================================
INFO:absl:culmen_length_mm (InputLayer)   [(None, 1)]          0                                            
INFO:absl:__________________________________________________________________________________________________
INFO:absl:culmen_depth_mm (InputLayer)    [(None, 1)]          0                                            
INFO:absl:__________________________________________________________________________________________________
INFO:absl:flipper_length_mm (InputLayer)  [(None, 1)]          0                                            
INFO:absl:__________________________________________________________________________________________________
INFO:absl:body_mass_g (InputLayer)        [(None, 1)]          0                                            
INFO:absl:__________________________________________________________________________________________________
INFO:absl:concatenate (Concatenate)       (None, 4)            0           culmen_length_mm[0][0]           
INFO:absl:                                                                 culmen_depth_mm[0][0]            
INFO:absl:                                                                 flipper_length_mm[0][0]          
INFO:absl:                                                                 body_mass_g[0][0]                
INFO:absl:__________________________________________________________________________________________________
INFO:absl:dense (Dense)                   (None, 8)            40          concatenate[0][0]                
INFO:absl:__________________________________________________________________________________________________
INFO:absl:dense_1 (Dense)                 (None, 8)            72          dense[0][0]                      
INFO:absl:__________________________________________________________________________________________________
INFO:absl:dense_2 (Dense)                 (None, 3)            27          dense_1[0][0]                    
INFO:absl:==================================================================================================
INFO:absl:Total params: 139
INFO:absl:Trainable params: 139
INFO:absl:Non-trainable params: 0
INFO:absl:__________________________________________________________________________________________________
100/100 [==============================] - 1s 6ms/step - loss: 0.7571 - sparse_categorical_accuracy: 0.6395 - val_loss: 0.1308 - val_sparse_categorical_accuracy: 0.9600
INFO:tensorflow:Assets written to: pipelines/penguin-simple/Trainer/model/2/serving_model_dir/assets
INFO:tensorflow:Assets written to: pipelines/penguin-simple/Trainer/model/2/serving_model_dir/assets
INFO:absl:Training complete. Model written to pipelines/penguin-simple/Trainer/model/2/serving_model_dir. ModelRun written to pipelines/penguin-simple/Trainer/model_run/2
INFO:absl:Running publisher for Trainer
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component Trainer is finished.
INFO:absl:Component Pusher is running.
INFO:absl:Running driver for Pusher
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Running executor for Pusher
WARNING:absl:Pusher is going to push the model without validation. Consider using Evaluator or InfraValidator in your pipeline.
INFO:absl:Model version: 1615285163
INFO:absl:Model written to serving path serving_model/penguin-simple/1615285163.
INFO:absl:Model pushed to pipelines/penguin-simple/Pusher/pushed_model/3.
INFO:absl:Running publisher for Pusher
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component Pusher is finished.

Dovresti vedere "INFO: absl: Component Pusher è finito." alla fine dei log se la pipeline è terminata correttamente. Perché il componente Pusher è l'ultimo componente della pipeline.

Il componente pusher SERVING_MODEL_DIR il modello addestrato a SERVING_MODEL_DIR che è la serving_model/penguin-simple se non hai modificato le variabili nei passaggi precedenti. Puoi vedere il risultato dal browser di file nel pannello di sinistra in Colab, o usando il seguente comando:

# List files in created model directory.
find {SERVING_MODEL_DIR}
serving_model/penguin-simple
serving_model/penguin-simple/1615285163
serving_model/penguin-simple/1615285163/variables
serving_model/penguin-simple/1615285163/variables/variables.data-00000-of-00001
serving_model/penguin-simple/1615285163/variables/variables.index
serving_model/penguin-simple/1615285163/assets
serving_model/penguin-simple/1615285163/saved_model.pb

Prossimi passi

Puoi trovare ulteriori risorse su https://www.tensorflow.org/tfx/tutorials

Vedere Comprensione delle pipeline TFX per saperne di più sui vari concetti in TFX.