Разработка функций с использованием TFX Pipeline и TensorFlow Transform

Преобразование входных данных и обучение модели с помощью конвейера TFX.

В этом руководстве на основе записной книжки мы создадим и запустим конвейер TFX для приема необработанных входных данных и их предварительной обработки для обучения машинному обучению. Этот ноутбук на основе TFX трубопровода мы построили в проверке данных с использованием TFX трубопроводов и TensorFlow проверки данных Учебного пособия . Если вы еще не читали эту книгу, вам следует прочитать ее, прежде чем продолжить работу с этой записной книжкой.

Вы можете повысить прогнозируемое качество ваших данных и / или уменьшить размерность с помощью проектирования функций. Одним из преимуществ использования TFX является то, что вы напишете свой код преобразования один раз, и полученные преобразования будут согласованы между обучением и обслуживанием, чтобы избежать перекоса при обучении / обслуживании.

Мы добавим Transform компонент трубопровода. Transform компонент реализован с использованием tf.transform библиотеки.

См Понимание TFX трубопроводов более узнать о различных концепциях в TFX.


Сначала нам нужно установить пакет TFX Python и загрузить набор данных, который мы будем использовать для нашей модели.

Обновить Pip

Чтобы избежать обновления Pip в системе при локальном запуске, убедитесь, что мы работаем в Colab. Конечно, локальные системы можно модернизировать отдельно.

  import colab
  !pip install --upgrade pip

Установить TFX

pip install -U tfx

Вы перезапускали среду выполнения?

Если вы используете Google Colab, при первом запуске указанной выше ячейки вы должны перезапустить среду выполнения, нажав кнопку «ПЕРЕЗАГРУЗИТЬ ВРЕМЯ» выше или используя меню «Время выполнения> Перезапустить среду выполнения ...». Это связано с тем, как Colab загружает пакеты.

Проверьте версии TensorFlow и TFX.

import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
TensorFlow version: 2.6.2
TFX version: 1.4.0

Настроить переменные

Для определения конвейера используются некоторые переменные. Вы можете настроить эти переменные по своему усмотрению. По умолчанию весь вывод из конвейера будет генерироваться в текущем каталоге.

import os

PIPELINE_NAME = "penguin-transform"

# Output directory to store artifacts generated from the pipeline.
PIPELINE_ROOT = os.path.join('pipelines', PIPELINE_NAME)
# Path to a SQLite DB file to use as an MLMD storage.
METADATA_PATH = os.path.join('metadata', PIPELINE_NAME, 'metadata.db')
# Output directory where created models from the pipeline will be exported.
SERVING_MODEL_DIR = os.path.join('serving_model', PIPELINE_NAME)

from absl import logging
logging.set_verbosity(logging.INFO)  # Set default logging level.

Подготовить пример данных

Мы загрузим пример набора данных для использования в нашем конвейере TFX. Набор данных мы используем это Palmer Пингвины набор данных .

Однако, в отличие от предыдущих руководств , которые использовали уже препроцессированный набор данных, мы будем использовать сырые Palmer Пингвины набор данных.

Поскольку компонент TFX ExampleGen считывает входные данные из каталога, нам нужно создать каталог и скопировать в него набор данных.

import urllib.request
import tempfile

DATA_ROOT = tempfile.mkdtemp(prefix='tfx-data')  # Create a temporary directory.
_data_path = 'https://storage.googleapis.com/download.tensorflow.org/data/palmer_penguins/penguins_size.csv'
_data_filepath = os.path.join(DATA_ROOT, "data.csv")
urllib.request.urlretrieve(_data_path, _data_filepath)
('/tmp/tfx-dataacmxfq9f/data.csv', <http.client.HTTPMessage at 0x7f5b0ab1bf10>)

Посмотрите, как выглядят необработанные данные.

head {_data_filepath}

Есть несколько записей с отсутствующими значениями , которые представлены в NA . Мы просто удалим эти записи в этом руководстве.

sed -i '/\bNA\b/d' {_data_filepath}
head {_data_filepath}

Вы должны увидеть семь характеристик, описывающих пингвинов. Мы будем использовать тот же набор функций, что и в предыдущих уроках - «culmen_length_mm», «culmen_depth_mm», «flipper_length_mm», «body_mass_g» - и будем предсказывать «вид» пингвина.

Единственное отличие будет заключаться в том, что входные данные не подвергаются предварительной обработке. Обратите внимание, что мы не будем использовать другие функции, такие как «остров» или «секс» в этом руководстве.

Подготовить файл схемы

Как описано в проверке данных с использованием TFX трубопроводов и TensorFlow проверки данных Учебник , нам нужен файл схемы для набора данных. Поскольку набор данных отличается от предыдущего урока, нам нужно сгенерировать его снова. В этом руководстве мы пропустим эти шаги и просто воспользуемся подготовленным файлом схемы.

import shutil

SCHEMA_PATH = 'schema'

_schema_uri = 'https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/penguin/schema/raw/schema.pbtxt'
_schema_filename = 'schema.pbtxt'
_schema_filepath = os.path.join(SCHEMA_PATH, _schema_filename)

os.makedirs(SCHEMA_PATH, exist_ok=True)
urllib.request.urlretrieve(_schema_uri, _schema_filepath)
('schema/schema.pbtxt', <http.client.HTTPMessage at 0x7f5b0ab20f50>)

Этот файл схемы был создан с помощью того же конвейера, что и в предыдущем руководстве, без каких-либо изменений вручную.

Создать конвейер

Конвейеры TFX определяются с помощью API Python. Мы добавим Transform компонент трубопровода , который мы создали в руководстве по валидации данных .

Преобразование компоненты требуют ввод данных из ExampleGen компонента и схем из SchemaGen компоненты, и производят «преобразование графика». Выход будет использоваться в Trainer компонента. Преобразование может дополнительно создавать «преобразованные данные», которые являются материализованными данными после преобразования. Однако мы будем преобразовывать данные во время обучения в этом руководстве без материализации промежуточных преобразованных данных.

Единственное , что следует отметить, что нам нужно определить функцию Python, preprocessing_fn , чтобы описать , как входные данные должны быть преобразованы. Это похоже на компонент Trainer, который также требует пользовательского кода для определения модели.

Написать код предварительной обработки и обучения

Нам нужно определить две функции Python. Один для преобразования и один для тренера.


Transform компонент будет найти функцию с именем preprocessing_fn в данном файле модуля , как мы делали для Trainer компонента. Вы также можете задать функцию , используя специфическую preprocessing_fn параметр из Transform компонента.

В этом примере мы сделаем два вида преобразования. Для непрерывных числовых функций , таких как culmen_length_mm и body_mass_g , мы будем нормировать эти значения , используя tft.scale_to_z_score функцию. Для функции метки нам нужно преобразовать строковые метки в числовые значения индекса. Мы будем использовать tf.lookup.StaticHashTable для преобразования.

Для идентификации трансформированных полей легко, мы присоединяем _xf суффикса трансформированных названия компонентов.


Сама модель почти такая же, как и в предыдущих руководствах, но на этот раз мы преобразуем входные данные, используя граф преобразования из компонента Transform.

Еще одно важное отличие от предыдущего руководства заключается в том, что теперь мы экспортируем модель для обслуживания, которая включает не только граф вычислений модели, но также граф преобразования для предварительной обработки, который создается в компоненте Transform. Нам нужно определить отдельную функцию, которая будет использоваться для обслуживания входящих запросов. Вы можете видеть , что та же функция _apply_preprocessing была использована как для тренировочных данных и обслуживающей запрос.

_module_file = 'penguin_utils.py'
%%writefile {_module_file}

from typing import List, Text
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_metadata.proto.v0 import schema_pb2
import tensorflow_transform as tft
from tensorflow_transform.tf_metadata import schema_utils

from tfx import v1 as tfx
from tfx_bsl.public import tfxio

# Specify features that we will use.
    'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
_LABEL_KEY = 'species'


# NEW: TFX Transform will call this function.
def preprocessing_fn(inputs):
  """tf.transform's callback function for preprocessing inputs.

    inputs: map from feature keys to raw not-yet-transformed features.

    Map from string feature key to transformed feature.
  outputs = {}

  # Uses features defined in _FEATURE_KEYS only.
  for key in _FEATURE_KEYS:
    # tft.scale_to_z_score computes the mean and variance of the given feature
    # and scales the output based on the result.
    outputs[key] = tft.scale_to_z_score(inputs[key])

  # For the label column we provide the mapping from string to index.
  # We could instead use `tft.compute_and_apply_vocabulary()` in order to
  # compute the vocabulary dynamically and perform a lookup.
  # Since in this example there are only 3 possible values, we use a hard-coded
  # table for simplicity.
  table_keys = ['Adelie', 'Chinstrap', 'Gentoo']
  initializer = tf.lookup.KeyValueTensorInitializer(
      values=tf.cast(tf.range(len(table_keys)), tf.int64),
  table = tf.lookup.StaticHashTable(initializer, default_value=-1)
  outputs[_LABEL_KEY] = table.lookup(inputs[_LABEL_KEY])

  return outputs

# NEW: This function will apply the same transform operation to training data
#      and serving requests.
def _apply_preprocessing(raw_features, tft_layer):
  transformed_features = tft_layer(raw_features)
  if _LABEL_KEY in raw_features:
    transformed_label = transformed_features.pop(_LABEL_KEY)
    return transformed_features, transformed_label
    return transformed_features, None

# NEW: This function will create a handler function which gets a serialized
#      tf.example, preprocess and run an inference with it.
def _get_serve_tf_examples_fn(model, tf_transform_output):
  # We must save the tft_layer to the model to ensure its assets are kept and
  # tracked.
  model.tft_layer = tf_transform_output.transform_features_layer()

      tf.TensorSpec(shape=[None], dtype=tf.string, name='examples')
  def serve_tf_examples_fn(serialized_tf_examples):
    # Expected input is a string which is serialized tf.Example format.
    feature_spec = tf_transform_output.raw_feature_spec()
    # Because input schema includes unnecessary fields like 'species' and
    # 'island', we filter feature_spec to include required keys only.
    required_feature_spec = {
        k: v for k, v in feature_spec.items() if k in _FEATURE_KEYS
    parsed_features = tf.io.parse_example(serialized_tf_examples,

    # Preprocess parsed input with transform operation defined in
    # preprocessing_fn().
    transformed_features, _ = _apply_preprocessing(parsed_features,
    # Run inference with ML model.
    return model(transformed_features)

  return serve_tf_examples_fn

def _input_fn(file_pattern: List[Text],
              data_accessor: tfx.components.DataAccessor,
              tf_transform_output: tft.TFTransformOutput,
              batch_size: int = 200) -> tf.data.Dataset:
  """Generates features and label for tuning/training.

    file_pattern: List of paths or patterns of input tfrecord files.
    data_accessor: DataAccessor for converting input to RecordBatch.
    tf_transform_output: A TFTransformOutput.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  dataset = data_accessor.tf_dataset_factory(

  transform_layer = tf_transform_output.transform_features_layer()
  def apply_transform(raw_features):
    return _apply_preprocessing(raw_features, transform_layer)

  return dataset.map(apply_transform).repeat()

def _build_keras_model() -> tf.keras.Model:
  """Creates a DNN Keras model for classifying penguin data.

    A Keras Model.
  # The model below is built with Functional API, please refer to
  # https://www.tensorflow.org/guide/keras/overview for all API options.
  inputs = [
      keras.layers.Input(shape=(1,), name=key)
      for key in _FEATURE_KEYS
  d = keras.layers.concatenate(inputs)
  for _ in range(2):
    d = keras.layers.Dense(8, activation='relu')(d)
  outputs = keras.layers.Dense(3)(d)

  model = keras.Model(inputs=inputs, outputs=outputs)

  return model

# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
  """Train the model based on given args.

    fn_args: Holds args used to train the model as name/value pairs.
  tf_transform_output = tft.TFTransformOutput(fn_args.transform_output)

  train_dataset = _input_fn(
  eval_dataset = _input_fn(

  model = _build_keras_model()

  # NEW: Save a computation graph including transform layer.
  signatures = {
      'serving_default': _get_serve_tf_examples_fn(model, tf_transform_output),
  model.save(fn_args.serving_model_dir, save_format='tf', signatures=signatures)
Writing penguin_utils.py

Теперь вы выполнили все подготовительные шаги для создания конвейера TFX.

Напишите определение конвейера

Мы определяем функцию для создания конвейера TFX. Pipeline объект представляет собой TFX трубопровод, который может быть запущен с помощью одного из трубопроводов оркестровки систем , которые TFX опор.

def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
                     schema_path: str, module_file: str, serving_model_dir: str,
                     metadata_path: str) -> tfx.dsl.Pipeline:
  """Implements the penguin pipeline with TFX."""
  # Brings data into the pipeline or otherwise joins/converts training data.
  example_gen = tfx.components.CsvExampleGen(input_base=data_root)

  # Computes statistics over data for visualization and example validation.
  statistics_gen = tfx.components.StatisticsGen(

  # Import the schema.
  schema_importer = tfx.dsl.Importer(

  # Performs anomaly detection based on statistics and data schema.
  example_validator = tfx.components.ExampleValidator(

  # NEW: Transforms input data using preprocessing_fn in the 'module_file'.
  transform = tfx.components.Transform(

  # Uses user-provided Python function that trains a model.
  trainer = tfx.components.Trainer(

      # NEW: Pass transform_graph to the trainer.


  # Pushes the model to a filesystem destination.
  pusher = tfx.components.Pusher(

  components = [

      transform,  # NEW: Transform component was added to the pipeline.


  return tfx.dsl.Pipeline(

Запустите конвейер

Мы будем использовать LocalDagRunner , как и в предыдущем уроке.

Вы должны увидеть «INFO: absl: Component Pusher is finished». если конвейер завершился успешно.

Толкатель компонент толкает обученную модель на SERVING_MODEL_DIR которая является serving_model/penguin-transform каталога , если вы не изменили переменные в предыдущих шагах. Вы можете увидеть результат в браузере файлов на левой панели в Colab или с помощью следующей команды:

# List files in created model directory.

Вы также можете проверить подпись сгенерированной модели с использованием saved_model_cli инструмента .

saved_model_cli show --dir {SERVING_MODEL_DIR}/$(ls -1 {SERVING_MODEL_DIR} | sort -nr | head -1) --tag_set serve --signature_def serving_default
The given SavedModel SignatureDef contains the following input(s):
  inputs['examples'] tensor_info:
      dtype: DT_STRING
      shape: (-1)
      name: serving_default_examples:0
The given SavedModel SignatureDef contains the following output(s):
  outputs['output_0'] tensor_info:
      dtype: DT_FLOAT
      shape: (-1, 3)
      name: StatefulPartitionedCall_2:0
Method name is: tensorflow/serving/predict

Поскольку мы определили serving_default с нашей собственной serve_tf_examples_fn функции, подписи показывает , что она занимает одну строку. Эта строка сериализованная строка tf.Examples и будет разбираться с tf.io.parse_example () функции , как мы определили ранее (узнать больше о tf.Examples здесь ).

Мы можем загрузить экспортированную модель и попробовать сделать выводы на нескольких примерах.

# Find a model with the latest timestamp.
model_dirs = (item for item in os.scandir(SERVING_MODEL_DIR) if item.is_dir())
model_path = max(model_dirs, key=lambda i: int(i.name)).path

loaded_model = tf.keras.models.load_model(model_path)
inference_fn = loaded_model.signatures['serving_default']
WARNING:tensorflow:Inconsistent references when loading the checkpoint into this object graph. Either the Trackable object references in the Python program have changed in an incompatible way, or the checkpoint was generated in an incompatible program.

Two checkpoint references resolved to different objects (<keras.saving.saved_model.load.TensorFlowTransform>TransformFeaturesLayer object at 0x7f5b0836e3d0> and <keras.engine.input_layer.InputLayer object at 0x7f5b091aa550>).
WARNING:tensorflow:Inconsistent references when loading the checkpoint into this object graph. Either the Trackable object references in the Python program have changed in an incompatible way, or the checkpoint was generated in an incompatible program.

Two checkpoint references resolved to different objects (<keras.saving.saved_model.load.TensorFlowTransform>TransformFeaturesLayer object at 0x7f5b0836e3d0> and <keras.engine.input_layer.InputLayer object at 0x7f5b091aa550>).
# Prepare an example and run inference.
features = {
  'culmen_length_mm': tf.train.Feature(float_list=tf.train.FloatList(value=[49.9])),
  'culmen_depth_mm': tf.train.Feature(float_list=tf.train.FloatList(value=[16.1])),
  'flipper_length_mm': tf.train.Feature(int64_list=tf.train.Int64List(value=[213])),
  'body_mass_g': tf.train.Feature(int64_list=tf.train.Int64List(value=[5400])),
example_proto = tf.train.Example(features=tf.train.Features(feature=features))
examples = example_proto.SerializeToString()

result = inference_fn(examples=tf.constant([examples]))
[[-2.5357873 -3.0600576  3.4993587]]

Ожидается, что третий элемент, соответствующий виду Gentoo, будет самым большим из трех.

Следующие шаги

Если вы хотите узнать больше о Transform компонент, см Transform руководства компонента . Вы можете найти больше ресурсов на https://www.tensorflow.org/tfx/tutorials

См Понимание TFX трубопроводов более узнать о различных концепциях в TFX.