Ingénierie de fonctionnalités à l'aide de TFX Pipeline et de TensorFlow Transform

Transformez les données d'entrée et tracez un modèle avec un pipeline TFX.

Dans ce didacticiel basé sur un bloc-notes, nous allons créer et exécuter un pipeline TFX pour ingérer les données d'entrée brutes et les prétraiter de manière appropriée pour la formation ML. Ce portable est basé sur le pipeline de TFX nous avons construit dans la validation des données en utilisant TFX Pipeline et tensorflow Validation des données Tutoriel . Si vous ne l'avez pas encore lu, vous devriez le lire avant de continuer avec ce cahier.

Vous pouvez augmenter la qualité prédictive de vos données et/ou réduire la dimensionnalité grâce à l'ingénierie des caractéristiques. L'un des avantages de l'utilisation de TFX est que vous n'écrirez votre code de transformation qu'une seule fois, et les transformations résultantes seront cohérentes entre l'entraînement et la diffusion afin d'éviter le décalage entre l'entraînement et la diffusion.

Nous allons ajouter un Transform composant au pipeline. La mise en œuvre est le composant transformation à l' aide de la tf.transform bibliothèque.

S'il vous plaît voir Comprendre TFX Pipelines pour en savoir plus sur les différents concepts TFX.


Nous devons d'abord installer le package Python TFX et télécharger le jeu de données que nous utiliserons pour notre modèle.

Pip de mise à niveau

Pour éviter de mettre à niveau Pip dans un système lors de l'exécution locale, assurez-vous que nous exécutons dans Colab. Les systèmes locaux peuvent bien sûr être mis à niveau séparément.

  import colab
  !pip install --upgrade pip

Installer TFX

pip install -U tfx

As-tu redémarré le runtime ?

Si vous utilisez Google Colab, la première fois que vous exécutez la cellule ci-dessus, vous devez redémarrer le runtime en cliquant au-dessus du bouton "RESTART RUNTIME" ou en utilisant le menu "Runtime> Restart runtime ...". Cela est dû à la façon dont Colab charge les packages.

Vérifiez les versions TensorFlow et TFX.

import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
TensorFlow version: 2.6.2
TFX version: 1.4.0

Configurer des variables

Certaines variables sont utilisées pour définir un pipeline. Vous pouvez personnaliser ces variables comme vous le souhaitez. Par défaut, toutes les sorties du pipeline seront générées sous le répertoire actuel.

import os

PIPELINE_NAME = "penguin-transform"

# Output directory to store artifacts generated from the pipeline.
PIPELINE_ROOT = os.path.join('pipelines', PIPELINE_NAME)
# Path to a SQLite DB file to use as an MLMD storage.
METADATA_PATH = os.path.join('metadata', PIPELINE_NAME, 'metadata.db')
# Output directory where created models from the pipeline will be exported.
SERVING_MODEL_DIR = os.path.join('serving_model', PIPELINE_NAME)

from absl import logging
logging.set_verbosity(logging.INFO)  # Set default logging level.

Préparer des exemples de données

Nous allons télécharger l'exemple de jeu de données à utiliser dans notre pipeline TFX. L'ensemble de données que nous utilisons est dataset Palmer Penguins .

Cependant, contrairement à des tutoriels précédents qui ont utilisé un ensemble de données déjà prétraité, nous utiliserons l'ensemble des données brutes Palmer Penguins.

Étant donné que le composant TFX ExampleGen lit les entrées d'un répertoire, nous devons créer un répertoire et y copier l'ensemble de données.

import urllib.request
import tempfile

DATA_ROOT = tempfile.mkdtemp(prefix='tfx-data')  # Create a temporary directory.
_data_path = ''
_data_filepath = os.path.join(DATA_ROOT, "data.csv")
urllib.request.urlretrieve(_data_path, _data_filepath)
('/tmp/tfx-dataacmxfq9f/data.csv', <http.client.HTTPMessage at 0x7f5b0ab1bf10>)

Jetez un coup d'œil à quoi ressemblent les données brutes.

head {_data_filepath}

Il y a quelques entrées avec les valeurs manquantes qui sont représentées par NA . Nous allons simplement supprimer ces entrées dans ce tutoriel.

sed -i '/\bNA\b/d' {_data_filepath}
head {_data_filepath}

Vous devriez être capable de voir sept caractéristiques qui décrivent les pingouins. Nous utiliserons le même ensemble de fonctionnalités que les didacticiels précédents - 'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g' - et prédirons l'"espèce" d'un pingouin.

La seule différence sera que les données d'entrée ne sont pas prétraitées. Notez que nous n'utiliserons pas d'autres fonctionnalités comme « île » ou « sexe » dans ce didacticiel.

Préparer un fichier de schéma

Comme décrit dans la validation des données en utilisant TFX Pipeline et tensorflow Validation des données didacticiel , nous avons besoin d' un fichier de schéma pour l'ensemble de données. Étant donné que le jeu de données est différent du didacticiel précédent, nous devons le générer à nouveau. Dans ce didacticiel, nous sauterons ces étapes et utiliserons simplement un fichier de schéma préparé.

import shutil

SCHEMA_PATH = 'schema'

_schema_uri = ''
_schema_filename = 'schema.pbtxt'
_schema_filepath = os.path.join(SCHEMA_PATH, _schema_filename)

os.makedirs(SCHEMA_PATH, exist_ok=True)
urllib.request.urlretrieve(_schema_uri, _schema_filepath)
('schema/schema.pbtxt', <http.client.HTTPMessage at 0x7f5b0ab20f50>)

Ce fichier de schéma a été créé avec le même pipeline que dans le didacticiel précédent sans aucune modification manuelle.

Créer un pipeline

Les pipelines TFX sont définis à l'aide d'API Python. Nous allons ajouter Transform le composant à la conduite que nous avons créé dans le tutoriel de validation de données .

Un composant de transformation nécessite des données d' entrée provenant d' un ExampleGen composant et un schéma d'un SchemaGen composant, et produit une « transformation graphique ». La sortie sera utilisée dans un Trainer composant. Transform peut éventuellement produire en plus des "données transformées", qui sont les données matérialisées après transformation. Cependant, nous allons transformer les données lors de la formation dans ce tutoriel sans matérialisation des données transformées intermédiaires.

Une chose à noter est que nous devons définir une fonction Python, preprocessing_fn pour décrire comment les données d'entrée doivent être transformées. Ceci est similaire à un composant Trainer qui nécessite également un code utilisateur pour la définition du modèle.

Écrire le code de prétraitement et d'entraînement

Nous devons définir deux fonctions Python. Un pour Transform et un pour Trainer.


Le composant Transformer trouvera une fonction nommée preprocessing_fn dans le fichier module donné que nous avons fait pour Trainer composant. Vous pouvez également spécifier une fonction spécifique à l' aide du preprocessing_fn paramètre du composant Transformer.

Dans cet exemple, nous allons effectuer deux types de transformation. Pour les fonctions numériques continues comme culmen_length_mm et body_mass_g , nous allons normaliser ces valeurs en utilisant la tft.scale_to_z_score fonction. Pour la fonction d'étiquette, nous devons convertir les étiquettes de chaîne en valeurs d'index numériques. Nous utiliserons tf.lookup.StaticHashTable pour la conversion.

Pour identifier les champs transformés facilement, nous ajoutons un _xf suffixe aux noms de fonctions transformées.


Le modèle lui-même est presque le même que dans les didacticiels précédents, mais cette fois, nous allons transformer les données d'entrée à l'aide du graphique de transformation du composant Transform.

Une autre différence importante par rapport au didacticiel précédent est que nous exportons maintenant un modèle pour le service qui inclut non seulement le graphique de calcul du modèle, mais également le graphique de transformation pour le prétraitement, qui est généré dans le composant Transform. Nous devons définir une fonction distincte qui sera utilisée pour répondre aux demandes entrantes. Vous pouvez voir que la même fonction _apply_preprocessing a été utilisé aussi bien pour des données de formation et la demande de service.

_module_file = ''
%%writefile {_module_file}

from typing import List, Text
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_metadata.proto.v0 import schema_pb2
import tensorflow_transform as tft
from tensorflow_transform.tf_metadata import schema_utils

from tfx import v1 as tfx
from tfx_bsl.public import tfxio

# Specify features that we will use.
    'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
_LABEL_KEY = 'species'


# NEW: TFX Transform will call this function.
def preprocessing_fn(inputs):
  """tf.transform's callback function for preprocessing inputs.

    inputs: map from feature keys to raw not-yet-transformed features.

    Map from string feature key to transformed feature.
  outputs = {}

  # Uses features defined in _FEATURE_KEYS only.
  for key in _FEATURE_KEYS:
    # tft.scale_to_z_score computes the mean and variance of the given feature
    # and scales the output based on the result.
    outputs[key] = tft.scale_to_z_score(inputs[key])

  # For the label column we provide the mapping from string to index.
  # We could instead use `tft.compute_and_apply_vocabulary()` in order to
  # compute the vocabulary dynamically and perform a lookup.
  # Since in this example there are only 3 possible values, we use a hard-coded
  # table for simplicity.
  table_keys = ['Adelie', 'Chinstrap', 'Gentoo']
  initializer = tf.lookup.KeyValueTensorInitializer(
      values=tf.cast(tf.range(len(table_keys)), tf.int64),
  table = tf.lookup.StaticHashTable(initializer, default_value=-1)
  outputs[_LABEL_KEY] = table.lookup(inputs[_LABEL_KEY])

  return outputs

# NEW: This function will apply the same transform operation to training data
#      and serving requests.
def _apply_preprocessing(raw_features, tft_layer):
  transformed_features = tft_layer(raw_features)
  if _LABEL_KEY in raw_features:
    transformed_label = transformed_features.pop(_LABEL_KEY)
    return transformed_features, transformed_label
    return transformed_features, None

# NEW: This function will create a handler function which gets a serialized
#      tf.example, preprocess and run an inference with it.
def _get_serve_tf_examples_fn(model, tf_transform_output):
  # We must save the tft_layer to the model to ensure its assets are kept and
  # tracked.
  model.tft_layer = tf_transform_output.transform_features_layer()

      tf.TensorSpec(shape=[None], dtype=tf.string, name='examples')
  def serve_tf_examples_fn(serialized_tf_examples):
    # Expected input is a string which is serialized tf.Example format.
    feature_spec = tf_transform_output.raw_feature_spec()
    # Because input schema includes unnecessary fields like 'species' and
    # 'island', we filter feature_spec to include required keys only.
    required_feature_spec = {
        k: v for k, v in feature_spec.items() if k in _FEATURE_KEYS
    parsed_features =,

    # Preprocess parsed input with transform operation defined in
    # preprocessing_fn().
    transformed_features, _ = _apply_preprocessing(parsed_features,
    # Run inference with ML model.
    return model(transformed_features)

  return serve_tf_examples_fn

def _input_fn(file_pattern: List[Text],
              data_accessor: tfx.components.DataAccessor,
              tf_transform_output: tft.TFTransformOutput,
              batch_size: int = 200) ->
  """Generates features and label for tuning/training.

    file_pattern: List of paths or patterns of input tfrecord files.
    data_accessor: DataAccessor for converting input to RecordBatch.
    tf_transform_output: A TFTransformOutput.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  dataset = data_accessor.tf_dataset_factory(

  transform_layer = tf_transform_output.transform_features_layer()
  def apply_transform(raw_features):
    return _apply_preprocessing(raw_features, transform_layer)


def _build_keras_model() -> tf.keras.Model:
  """Creates a DNN Keras model for classifying penguin data.

    A Keras Model.
  # The model below is built with Functional API, please refer to
  # for all API options.
  inputs = [
      keras.layers.Input(shape=(1,), name=key)
      for key in _FEATURE_KEYS
  d = keras.layers.concatenate(inputs)
  for _ in range(2):
    d = keras.layers.Dense(8, activation='relu')(d)
  outputs = keras.layers.Dense(3)(d)

  model = keras.Model(inputs=inputs, outputs=outputs)

  return model

# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
  """Train the model based on given args.

    fn_args: Holds args used to train the model as name/value pairs.
  tf_transform_output = tft.TFTransformOutput(fn_args.transform_output)

  train_dataset = _input_fn(
  eval_dataset = _input_fn(

  model = _build_keras_model()

  # NEW: Save a computation graph including transform layer.
  signatures = {
      'serving_default': _get_serve_tf_examples_fn(model, tf_transform_output),
  }, save_format='tf', signatures=signatures)

Vous avez maintenant terminé toutes les étapes de préparation pour créer un pipeline TFX.

Écrire une définition de pipeline

Nous définissons une fonction pour créer un pipeline TFX. Un Pipeline objet représente un pipeline de TFX, qui peut être exécuté en utilisant l' un des systèmes d'orchestration de pipeline de supports de TFX.

def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
                     schema_path: str, module_file: str, serving_model_dir: str,
                     metadata_path: str) -> tfx.dsl.Pipeline:
  """Implements the penguin pipeline with TFX."""
  # Brings data into the pipeline or otherwise joins/converts training data.
  example_gen = tfx.components.CsvExampleGen(input_base=data_root)

  # Computes statistics over data for visualization and example validation.
  statistics_gen = tfx.components.StatisticsGen(

  # Import the schema.
  schema_importer = tfx.dsl.Importer(

  # Performs anomaly detection based on statistics and data schema.
  example_validator = tfx.components.ExampleValidator(

  # NEW: Transforms input data using preprocessing_fn in the 'module_file'.
  transform = tfx.components.Transform(

  # Uses user-provided Python function that trains a model.
  trainer = tfx.components.Trainer(

      # NEW: Pass transform_graph to the trainer.


  # Pushes the model to a filesystem destination.
  pusher = tfx.components.Pusher(

  components = [

      transform,  # NEW: Transform component was added to the pipeline.


  return tfx.dsl.Pipeline(

Exécuter le pipeline

Nous utiliserons LocalDagRunner comme dans le tutoriel précédent.

Vous devriez voir « INFO:absl:Component Pusher terminé. si le pipeline s'est terminé avec succès.

Le composant poussoir pousse le modèle formé à l' SERVING_MODEL_DIR qui est le serving_model/penguin-transform répertoire si vous ne modifiez pas les variables dans les étapes précédentes. Vous pouvez voir le résultat à partir du navigateur de fichiers dans le panneau de gauche de Colab, ou à l'aide de la commande suivante :

# List files in created model directory.

Vous pouvez également vérifier la signature du modèle généré en utilisant l' saved_model_cli outil .

saved_model_cli show --dir {SERVING_MODEL_DIR}/$(ls -1 {SERVING_MODEL_DIR} | sort -nr | head -1) --tag_set serve --signature_def serving_default
The given SavedModel SignatureDef contains the following input(s):
  inputs['examples'] tensor_info:
      dtype: DT_STRING
      shape: (-1)
      name: serving_default_examples:0
The given SavedModel SignatureDef contains the following output(s):
  outputs['output_0'] tensor_info:
      dtype: DT_FLOAT
      shape: (-1, 3)
      name: StatefulPartitionedCall_2:0
Method name is: tensorflow/serving/predict

Parce que nous avons défini serving_default avec notre propre serve_tf_examples_fn fonction, les spectacles de signature qu'il prend une seule chaîne. Cette chaîne est une chaîne sérialisée de tf.Examples et sera analysé avec le () fonction que nous avons défini plus tôt ( en savoir plus sur tf.Examples ici ).

Nous pouvons charger le modèle exporté et essayer quelques inférences avec quelques exemples.

# Find a model with the latest timestamp.
model_dirs = (item for item in os.scandir(SERVING_MODEL_DIR) if item.is_dir())
model_path = max(model_dirs, key=lambda i: int(

loaded_model = tf.keras.models.load_model(model_path)
inference_fn = loaded_model.signatures['serving_default']
WARNING:tensorflow:Inconsistent references when loading the checkpoint into this object graph. Either the Trackable object references in the Python program have changed in an incompatible way, or the checkpoint was generated in an incompatible program.

Two checkpoint references resolved to different objects (<keras.saving.saved_model.load.TensorFlowTransform>TransformFeaturesLayer object at 0x7f5b0836e3d0> and <keras.engine.input_layer.InputLayer object at 0x7f5b091aa550>).
WARNING:tensorflow:Inconsistent references when loading the checkpoint into this object graph. Either the Trackable object references in the Python program have changed in an incompatible way, or the checkpoint was generated in an incompatible program.

Two checkpoint references resolved to different objects (<keras.saving.saved_model.load.TensorFlowTransform>TransformFeaturesLayer object at 0x7f5b0836e3d0> and <keras.engine.input_layer.InputLayer object at 0x7f5b091aa550>).
# Prepare an example and run inference.
features = {
  'culmen_length_mm': tf.train.Feature(float_list=tf.train.FloatList(value=[49.9])),
  'culmen_depth_mm': tf.train.Feature(float_list=tf.train.FloatList(value=[16.1])),
  'flipper_length_mm': tf.train.Feature(int64_list=tf.train.Int64List(value=[213])),
  'body_mass_g': tf.train.Feature(int64_list=tf.train.Int64List(value=[5400])),
example_proto = tf.train.Example(features=tf.train.Features(feature=features))
examples = example_proto.SerializeToString()

result = inference_fn(examples=tf.constant([examples]))
[[-2.5357873 -3.0600576  3.4993587]]

Le troisième élément, qui correspond à l'espèce « Gentoo », devrait être le plus grand des trois.

Prochaines étapes

Si vous voulez en savoir plus sur Transformer composant, consultez Transformer Guide des composants . Vous pouvez trouver plus de ressources sur

S'il vous plaît voir Comprendre TFX Pipelines pour en savoir plus sur les différents concepts TFX.