Присоединяйтесь к сообществу SIG TFX-Addons и помогите сделать TFX еще лучше!

Начать работу с TensorFlow Transform

В этом руководстве представлены основные концепции tf.Transform и способы их использования. Так и будет:

  • Определите функцию предварительной обработки , логическое описание конвейера, который преобразует необработанные данные в данные, используемые для обучения модели машинного обучения.
  • Покажите реализацию Apache Beam, используемую для преобразования данных путем преобразования функции предварительной обработки в конвейер Beam .
  • Показать дополнительные примеры использования.

Определите функцию предварительной обработки

Функция предварительной обработки - самая важная концепция tf.Transform . Функция предварительной обработки - это логическое описание преобразования набора данных. Функция предварительной обработки принимает и возвращает словарь тензоров, где тензор означает Tensor или SparseTensor . Для определения функции предварительной обработки используются два вида функций:

  1. Любая функция, которая принимает и возвращает тензоры. Они добавляют к графу операции TensorFlow, которые преобразуют необработанные данные в преобразованные.
  2. Любой из анализаторов, предоставляемых tf.Transform . Анализаторы также принимают и возвращают тензоры, но, в отличие от функций TensorFlow, они не добавляют операций к графу. Вместо этого анализаторы заставляют tf.Transform вычислять операцию полного прохода вне TensorFlow. Они используют значения входного тензора по всему набору данных для генерации постоянного тензора, который возвращается в качестве выходных данных. Например, tft.min вычисляет минимум тензора для набора данных. tf.Transform предоставляет фиксированный набор анализаторов, но он будет расширен в будущих версиях.

Пример функции предварительной обработки

Комбинируя анализаторы и обычные функции TensorFlow, пользователи могут создавать гибкие конвейеры для преобразования данных. Следующая функция предварительной обработки преобразует каждую из трех функций по-разному и объединяет две функции:

import tensorflow as tf
import tensorflow_transform as tft
import tensorflow_transform.beam as tft_beam

def preprocessing_fn(inputs):
  x = inputs['x']
  y = inputs['y']
  s = inputs['s']
  x_centered = x - tft.mean(x)
  y_normalized = tft.scale_to_0_1(y)
  s_integerized = tft.compute_and_apply_vocabulary(s)
  x_centered_times_y_normalized = x_centered * y_normalized
  return {
      'x_centered': x_centered,
      'y_normalized': y_normalized,
      'x_centered_times_y_normalized': x_centered_times_y_normalized,
      's_integerized': s_integerized
  }

Здесь x , y и s - это Tensor которые представляют входные функции. Первый новый созданный тензор, x_centered , x_centered путем применения tft.mean к x и вычитания его из x . tft.mean(x) возвращает тензор, представляющий среднее значение тензора x . x_centered - тензор x с вычтенным средним.

Второй новый тензор, y_normalized , создается аналогичным образом, но с использованием удобного метода tft.scale_to_0_1 . Этот метод делает что-то похожее на вычисление x_centered , а именно вычисляет максимум и минимум и использует их для масштабирования y .

Тензор s_integerized показывает пример манипуляции строкой. В этом случае мы берем строку и сопоставляем ее с целым числом. При этом используетсяtft.compute_and_apply_vocabulary функцияtft.compute_and_apply_vocabulary . Эта функция использует анализатор для вычисления уникальных значений, принимаемых входными строками, а затем использует операции TensorFlow для преобразования входных строк в индексы в таблице уникальных значений.

Последний столбец показывает, что можно использовать операции TensorFlow для создания новых функций путем комбинирования тензоров.

Функция предварительной обработки определяет конвейер операций с набором данных. Чтобы применить конвейер, мы полагаемся на конкретную реализацию tf.Transform API. Реализация Apache Beam предоставляет PTransform который применяет функцию предварительной обработки пользователя к данным. Типичный рабочий tf.Transform пользователя tf.Transform создает функцию предварительной обработки, а затем включает ее в более крупный конвейер Beam, создавая данные для обучения.

Дозирование

Пакетная обработка - важная часть TensorFlow. Поскольку одной из целей tf.Transform является предоставление графа tf.Transform для предварительной обработки, который может быть включен в граф обслуживания (и, необязательно, граф обучения), пакетная обработка также является важной концепцией в tf.Transform .

Хотя это и не очевидно в приведенном выше примере, определяемой пользователем функции предварительной обработки передаются тензоры, представляющие пакеты, а не отдельные экземпляры, как это происходит во время обучения и обслуживания с TensorFlow. С другой стороны, анализаторы выполняют вычисление по всему набору данных, которое возвращает одно значение, а не пакет значений. x - это Tensor с формой (batch_size,) , а tft.mean(x) - это Tensor с формой () . Вычитание x - tft.mean(x) передает, где значение tft.mean(x) вычитается из каждого элемента пакета, представленного x .

Реализация Apache Beam

Хотя функция предварительной обработки предназначена как логическое описание конвейера предварительной обработки, реализованного на нескольких платформах обработки данных, tf.Transform предоставляет каноническую реализацию, используемую в Apache Beam. Эта реализация демонстрирует функциональные возможности, требуемые от реализации. Формального API для этой функции не существует, поэтому каждая реализация может использовать API, который является идиоматическим для ее конкретной платформы обработки данных.

Реализация Apache Beam предоставляет два PTransform используемых для обработки данных для функции предварительной обработки. Ниже показано использование составного PTransform AnalyzeAndTransformDataset :

raw_data = [
    {'x': 1, 'y': 1, 's': 'hello'},
    {'x': 2, 'y': 2, 's': 'world'},
    {'x': 3, 'y': 3, 's': 'hello'}
]

raw_data_metadata = ...
transformed_dataset, transform_fn = (
    (raw_data, raw_data_metadata) | tft_beam.AnalyzeAndTransformDataset(
        preprocessing_fn))
transformed_data, transformed_metadata = transformed_dataset

Содержимое transformed_data показано ниже и содержит преобразованные столбцы в том же формате, что и необработанные данные. В частности, значения s_integerized равны [0, 1, 0] эти значения зависят от того, как слова hello и world были сопоставлены с целыми числами, что является детерминированным. Для столбца x_centered мы x_centered среднее значение, поэтому значения столбца x , которые были [1.0, 2.0, 3.0] , стали [-1.0, 0.0, 1.0] . Точно так же остальные столбцы соответствуют ожидаемым значениям.

[{u's_integerized': 0,
  u'x_centered': -1.0,
  u'x_centered_times_y_normalized': -0.0,
  u'y_normalized': 0.0},
 {u's_integerized': 1,
  u'x_centered': 0.0,
  u'x_centered_times_y_normalized': 0.0,
  u'y_normalized': 0.5},
 {u's_integerized': 0,
  u'x_centered': 1.0,
  u'x_centered_times_y_normalized': 1.0,
  u'y_normalized': 1.0}]

И raw_data и transformed_data являются наборами данных. В следующих двух разделах показано, как реализация Beam представляет наборы данных и как читать и записывать данные на диск. Другое возвращаемое значение, transform_fn , представляет преобразование, примененное к данным, подробно описанное ниже.

AnalyzeAndTransformDataset - это композиция двух фундаментальных преобразований, предоставляемых реализациями AnalyzeDataset и TransformDataset . Таким образом, следующие два фрагмента кода эквивалентны:

transformed_data, transform_fn = (
    my_data | tft_beam.AnalyzeAndTransformDataset(preprocessing_fn))
transform_fn = my_data | tft_beam.AnalyzeDataset(preprocessing_fn)
transformed_data = (my_data, transform_fn) | tft_beam.TransformDataset()

transform_fn - это чистая функция, которая представляет операцию, которая применяется к каждой строке набора данных. В частности, значения анализатора уже вычислены и рассматриваются как константы. В этом примере transform_fn содержит в качестве констант среднее значение столбца x , минимальное и максимальное значения столбца y и словарь, используемый для сопоставления строк с целыми числами.

Важной особенностью tf.Transform является то, что transform_fn представляет карту по строкам - это чистая функция, применяемая к каждой строке отдельно. Все вычисления для агрегирования строк выполняются в AnalyzeDataset . Кроме того, transform_fn представлен как TensorFlow Graph который можно встроить в обслуживающий граф.

AnalyzeAndTransformDataset предоставляется для оптимизации в этом особом случае. Этот же шаблон используется в scikit-learn , предоставляя методы fit , transform и fit_transform .

Форматы данных и схема

Реализация TFT Beam принимает два разных формата входных данных. Формат «instance dict» (как показано в примере выше и в simple_example.py ) является интуитивно понятным форматом и подходит для небольших наборов данных, тогда как формат TFXIO ( Apache Arrow ) обеспечивает улучшенную производительность и подходит для больших наборов данных.

Реализация Beam сообщает, в каком формате будет входная коллекция PCollection, с помощью «метаданных», сопровождающих коллекцию PCollection:

(raw_data, raw_data_metadata) | tft.AnalyzeDataset(...)
  • Если raw_data_metadata - это dataset_metadata.DatasetMetadata (см. Ниже, раздел «Формат dict экземпляра»), то ожидается, что raw_data будет в формате «instance dict».
  • Если raw_data_metadata является tfxio.TensorAdapterConfig (смотри ниже, «Формат TFXIO» секция), то raw_data как ожидается, будет в формате TFXIO.

Формат "instance dict"

В предыдущих примерах кода код, определяющий raw_data_metadata , опущен. Метаданные содержат схему, которая определяет структуру данных, чтобы они считывались и записывались в различных форматах. Даже формат в памяти, показанный в последнем разделе, не является самоописывающим и требует схемы, чтобы его можно было интерпретировать как тензоры.

Вот определение схемы для данных примера:

from tensorflow_transform.tf_metadata import dataset_metadata
from tensorflow_transform.tf_metadata import schema_utils

raw_data_metadata = dataset_metadata.DatasetMetadata(
      schema_utils.schema_from_feature_spec({
        's': tf.io.FixedLenFeature([], tf.string),
        'y': tf.io.FixedLenFeature([], tf.float32),
        'x': tf.io.FixedLenFeature([], tf.float32),
    }))

Протокол Schema содержит информацию, необходимую для синтаксического анализа данных из формата на диске или в памяти в тензоры. Обычно он schema_utils.schema_from_feature_spec путем вызова schema_utils.schema_from_feature_spec с ключами функции сопоставления dict со значениями tf.io.FixedLenFeature , tf.io.VarLenFeature и tf.io.SparseFeature . См. Документацию для tf.parse_example для получения более подробной информации.

Выше мы использовали tf.io.FixedLenFeature чтобы указать, что каждая функция содержит фиксированное количество значений, в данном случае одно скалярное значение. Поскольку tf.Transform экземпляры в пакеты, фактический Tensor представляющий объект, будет иметь форму (None,) где неизвестное измерение является измерением пакета.

Формат TFXIO

В этом формате ожидается, что данные будут содержаться в pyarrow.RecordBatch . Для табличных данных наша реализация Apache Beam принимает RecordBatch Arrow RecordBatch которые состоят из столбцов следующих типов:

  • pa.list_(<primitive>) , где <primitive> - это pa.int64() , pa.float32() pa.binary() или pa.large_binary() .

  • pa.large_list(<primitive>)

Набор входных данных игрушки, который мы использовали выше, когда он представлен как RecordBatch , выглядит следующим образом:

raw_data = [
    pa.record_batch([
        pa.array([[1], [2], [3]], pa.list_(pa.float32())),
        pa.array([[1], [2], [3]], pa.list_(pa.float32())),
        pa.array([['hello'], ['world'], ['hello']], pa.list_(pa.binary())),
    ], ['x', 'y', 's'])
]

Подобно тому, как DatasetMetadata требуется для сопровождения формата «instance dict», требуется tfxio.TensorAdapterConfig для сопровождения RecordBatch es. Он состоит из схемы Arrow для RecordBatch es и TensorRepresentations чтобы однозначно определить, как столбцы в RecordBatch es могут интерпретироваться как TensorFlow Tensors (включая, помимо прочего, tf.Tensor, tf.SparseTensor).

TensorRepresentations - это Dict[Text, TensorRepresentation] который устанавливает связь между Tensor, который принимает preprocessing_fn и столбцами в RecordBatch es. Например:

tensor_representation = {
    'x': text_format.Parse(
        """dense_tensor { column_name: "col1" shape { dim { size: 2 } } }"""
        schema_pb2.TensorRepresentation())
}

Означает, что inputs['x'] в preprocessing_fn должен быть плотным tf.Tensor, значения которого берутся из столбца с именем 'col1' во входных RecordBatch es, а его (пакетная) форма должна быть [batch_size, 2] .

TensorRepresentation - это Protobuf, определенный в метаданных TensorFlow.

Совместимость с TensorFlow

tf.Transform обеспечивает поддержку экспорта transform_fn выше либо как TF 1.x, либо как TF 2.x SavedModel. Поведение по умолчанию до того, как выпуск 0.30 экспортировал SavedModel TF 1.x. Начиная с версии 0.30 , по умолчанию выполняется экспорт SavedModel TF 2.x, если поведение TF 2.x не отключено явным образом (например, путем вызова tf.compat.v1.disable_v2_behavior() ).

При использовании таких концепций TF 1.x, как Estimators и Sessions , вы можете сохранить предыдущее поведение, передав force_tf_compat_v1=True в tft_beam.Context при использовании tf.Transform в качестве отдельной библиотеки или в компонент Transform в TFX.

При экспорте transform_fn как SavedModel TF 2.x ожидается, что preprocessing_fn будет отслеживаться с помощью tf.function . Кроме того, при удаленном запуске конвейера (например, с помощью DataflowRunner ) убедитесь, что preprocessing_fn и все зависимости правильно упакованы, как описано здесь .

Известные проблемы с использованием tf.Transform экспортировать 2.x TF SavedModel документируются здесь .

Ввод и вывод с помощью Apache Beam

До сих пор мы видели входные и выходные данные в списках Python (из RecordBatch es или словарей экземпляров). Это упрощение, основанное на способности Apache Beam работать со списками, а также на его основном представлении данных - PCollection .

PCollection - это представление данных, которое является частью конвейера Beam. Конвейер Beam формируется путем применения различных PTransform , включая AnalyzeDataset и TransformDataset , и запуска конвейера. PCollection не создается в памяти основного двоичного PCollection , а вместо этого распределяется между рабочими (хотя в этом разделе используется режим выполнения в памяти).

Предварительно PCollection исходники TFXIO ( TFXIO )

Формат RecordBatch , который принимает наша реализация, является распространенным форматом, который принимают другие библиотеки TFX. Поэтому TFX предлагает удобные «источники» (также известные как TFXIO ), которые читают файлы различных форматов на диске и создают RecordBatch а также могут передавать TensorAdapterConfig , включая предполагаемые TensorRepresentations .

Эти TFXIO можно найти в пакете tfx_bsl ( tfx_bsl.public.tfxio ).

Пример: набор данных "Доход переписи"

В следующем примере требуется как чтение, так и запись данных на диск, а также представление данных в виде PCollection (а не списка), см. census_example.py . Ниже мы покажем, как загрузить данные и запустить этот пример. Набор данных «Census Income» предоставляется репозиторием UCI Machine Learning Repository . Этот набор данных содержит как категориальные, так и числовые данные.

Данные в формате CSV, вот первые две строки:

39, State-gov, 77516, Bachelors, 13, Never-married, Adm-clerical, Not-in-family, White, Male, 2174, 0, 40, United-States, <=50K
50, Self-emp-not-inc, 83311, Bachelors, 13, Married-civ-spouse, Exec-managerial, Husband, White, Male, 0, 0, 13, United-States, <=50K

Столбцы набора данных могут быть категориальными или числовыми. Этот набор данных описывает проблему классификации: прогнозирование последнего столбца, в котором человек зарабатывает более или менее 50 тысяч в год. Однако с точки зрения tf.Transform эта метка - просто еще один категориальный столбец.

Мы используем предварительно подготовленный TFXIO , BeamRecordCsvTFXIO чтобы преобразовать строки CSV в RecordBatches . TFXIO требует двух важных сведений:

  • Схема метаданных TensorFlow, которая содержит информацию о типе и форме каждого столбца CSV. TensorRepresentation являются необязательной частью схемы; если они не указаны (как в этом примере), они будут выведены из информации о типе и форме. Схему можно получить либо с помощью вспомогательной функции, которую мы предоставляем для перевода из спецификаций синтаксического анализа TF (показанных в этом примере), либо путем запуска TensorFlow Data Validation .

  • список имен столбцов в том порядке, в котором они появляются в файле CSV. Обратите внимание, что эти имена должны совпадать с именами функций в схеме.

В этом примере мы позволяем отсутствовать функции education-num . Это означает, что он представлен как tf.io.VarLenFeature в feature_spec и как tf.SparseTensor в preprocessing_fn . Другие функции станут tf.Tensor с тем же именем в tf.Tensor preprocessing_fn .

csv_tfxio = tfxio.BeamRecordCsvTFXIO(
    physical_format='text', column_names=ordered_columns, schema=SCHEMA)

record_batches = (
    p
    | 'ReadTrainData' >> textio.ReadFromText(train_data_file)
    | ...  # fix up csv lines
    | 'ToRecordBatches' >> csv_tfxio.BeamSource())

tensor_adapter_config = csv_tfxio.TensorAdapterConfig()

Обратите внимание, что нам пришлось внести некоторые дополнительные исправления после того, как строки CSV были прочитаны. В противном случае мы могли бы полагаться на CsvTFXIO для обработки как чтения файлов, так и перевода в RecordBatch es:

csv_tfxio = tfxio.CsvTFXIO(train_data_file, column_name=ordered_columns,
                           schema=SCHEMA)
record_batches = p | 'TFXIORead' >> csv_tfxio.BeamSource()
tensor_adapter_config = csv_tfxio.TensorAdapterConfig()

Предварительная обработка аналогична предыдущему примеру, за исключением того, что функция предварительной обработки генерируется программно вместо того, чтобы вручную указывать каждый столбец. В функции предварительной обработки ниже NUMERICAL_COLUMNS и CATEGORICAL_COLUMNS - это списки, содержащие имена числовых и категориальных столбцов:

def preprocessing_fn(inputs):
  """Preprocess input columns into transformed columns."""
  # Since we are modifying some features and leaving others unchanged, we
  # start by setting `outputs` to a copy of `inputs.
  outputs = inputs.copy()

  # Scale numeric columns to have range [0, 1].
  for key in NUMERIC_FEATURE_KEYS:
    outputs[key] = tft.scale_to_0_1(outputs[key])

  for key in OPTIONAL_NUMERIC_FEATURE_KEYS:
    # This is a SparseTensor because it is optional. Here we fill in a default
    # value when it is missing.
      sparse = tf.sparse.SparseTensor(outputs[key].indices, outputs[key].values,
                                      [outputs[key].dense_shape[0], 1])
      dense = tf.sparse.to_dense(sp_input=sparse, default_value=0.)
    # Reshaping from a batch of vectors of size 1 to a batch to scalars.
    dense = tf.squeeze(dense, axis=1)
    outputs[key] = tft.scale_to_0_1(dense)

  # For all categorical columns except the label column, we generate a
  # vocabulary but do not modify the feature.  This vocabulary is instead
  # used in the trainer, by means of a feature column, to convert the feature
  # from a string to an integer id.
  for key in CATEGORICAL_FEATURE_KEYS:
    tft.vocabulary(inputs[key], vocab_filename=key)

  # For the label column we provide the mapping from string to index.
  initializer = tf.lookup.KeyValueTensorInitializer(
      keys=['>50K', '<=50K'],
      values=tf.cast(tf.range(2), tf.int64),
      key_dtype=tf.string,
      value_dtype=tf.int64)
  table = tf.lookup.StaticHashTable(initializer, default_value=-1)

  outputs[LABEL_KEY] = table.lookup(outputs[LABEL_KEY])

  return outputs

Одним из отличий от предыдущего примера является то, что столбец метки вручную определяет сопоставление строки с индексом. Таким образом, '>50' отображается в 0 а '<=50K' отображается в 1 потому что полезно знать, какой индекс в обученной модели соответствует какой метке.

Переменная record_batches представляет собой PCollection pyarrow.RecordBatch . tensor_adapter_config задается csv_tfxio , который выводится из SCHEMA (и, в конечном счете, в этом примере из спецификаций синтаксического анализа TF).

Заключительный этап - это запись преобразованных данных на диск, который имеет форму, аналогичную чтению необработанных данных. Схема, используемая для этого, является частью выходных данных AnalyzeAndTransformDataset которая определяет схему для выходных данных. Код для записи на диск показан ниже. Схема является частью метаданных, но использует их взаимозаменяемо в tf.Transform API (т. ExampleProtoCoder метаданные в ExampleProtoCoder ). Имейте в виду, что это записывает в другой формат. Вместо textio.WriteToText используйте встроенную поддержку TFRecord формата TFRecord и используйте кодировщик для кодирования данных в качестве Example протоколов. Этот формат лучше использовать для обучения, как показано в следующем разделе. transformed_eval_data_base предоставляет базовое имя файла для отдельных записываемых сегментов.

transformed_data | "WriteTrainData" >> tfrecordio.WriteToTFRecord(
    transformed_eval_data_base,
    coder=tft.coders.ExampleProtoCoder(transformed_metadata))

В дополнение к обучающим данным, transform_fn также записывается с метаданными:

_ = (
    transform_fn
    | 'WriteTransformFn' >> tft_beam.WriteTransformFn(working_dir))
transformed_metadata | 'WriteMetadata' >> tft_beam.WriteMetadata(
    transformed_metadata_file, pipeline=p)

Запустите весь конвейер Beam с помощью p.run().wait_until_finish() . До этого момента конвейер Beam представляет отложенные распределенные вычисления. Он содержит инструкции о том, что будет сделано, но инструкции не были выполнены. Этот последний вызов выполняет указанный конвейер.

Скачать набор данных переписи

Загрузите набор данных переписи, используя следующие команды оболочки:

  wget https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.data
  wget https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.test

При запуске скрипта census_example.py передайте каталог, содержащий эти данные, в качестве первого аргумента. Сценарий создает временный подкаталог для добавления предварительно обработанных данных.

Интеграция с TensorFlow Training

Последний раздел census_example.py показывает, как предварительно обработанные данные используются для обучения модели. Подробную информацию см. В документации к Оценщикам . Первым шагом является создание Estimator который требует описания предварительно обработанных столбцов. Каждый числовой столбец описывается как real_valued_column который является оболочкой вокруг плотного вектора фиксированного размера ( 1 в этом примере). Каждый категориальный столбец преобразуется из строки в целые числа, а затем передается в indicator_column . tft.TFTransformOutput используется для поиска пути к файлу словаря для каждой категориальной функции.

real_valued_columns = [feature_column.real_valued_column(key)
                       for key in NUMERIC_FEATURE_KEYS]

one_hot_columns = [
    tf.feature_column.indicator_column(
        tf.feature_column.categorical_column_with_vocabulary_file(
            key=key,
            vocabulary_file=tf_transform_output.vocabulary_file_by_name(
                vocab_filename=key)))
    for key in CATEGORICAL_FEATURE_KEYS]

estimator = tf.estimator.LinearClassifier(real_valued_columns + one_hot_columns)

Следующим шагом является создание конструктора для генерации входной функции для обучения и оценки. Это отличается от обучения, используемого tf.Learn поскольку для анализа преобразованных данных не требуется спецификация функции. Вместо этого используйте метаданные для преобразованных данных для создания спецификации функции.

def _make_training_input_fn(tf_transform_output, transformed_examples,
                            batch_size):
  ...
  def input_fn():
    """Input function for training and eval."""
    dataset = tf.data.experimental.make_batched_features_dataset(
        ..., tf_transform_output.transformed_feature_spec(), ...)

    transformed_features = tf.compat.v1.data.make_one_shot_iterator(
        dataset).get_next()
    ...

  return input_fn

Остающийся код такой же, как и при использовании класса Estimator . Пример также содержит код для экспорта модели в формате SavedModel . Экспортированная модель может использоваться Tensorflow Serving или Cloud ML Engine .