Присоединяйтесь к сообществу SIG TFX-Addons и помогите сделать TFX еще лучше!

Компонент конвейера ExampleGen TFX

Компонент ExampleGen TFX Pipeline принимает данные в конвейеры TFX. Он использует внешние файлы / службы для создания примеров, которые будут считываться другими компонентами TFX. Он также обеспечивает согласованный и настраиваемый раздел и перемешивает набор данных в соответствии с передовой практикой машинного обучения.

  • Потребляет: данные из внешних источников данных, таких как CSV, TFRecord , Avro, Parquet и BigQuery.
  • Выдает: записи tf.Example записи tf.SequenceExample или формат прототипа, в зависимости от формата полезной нагрузки.

ExampleGen и другие компоненты

ExampleGen предоставляет данные компонентам, использующим библиотеку проверки данных TensorFlow , таким как SchemaGen , StatisticsGen и Example Validator . Он также предоставляет данные для преобразования , которое использует библиотеку TensorFlow Transform , и, в конечном итоге, для целей развертывания во время вывода.

Источники данных и форматы

В настоящее время стандартная установка TFX включает полные компоненты ExampleGen для этих источников данных и форматов:

Также доступны специальные исполнители, которые позволяют разрабатывать компоненты ExampleGen для этих источников данных и форматов:

См. Примеры использования в исходном коде и в этом обсуждении для получения дополнительной информации о том, как использовать и разрабатывать собственные исполнители.

Кроме того, эти источники и форматы данных доступны в качестве примеров пользовательских компонентов :

Форматы данных, поддерживаемые Apache Beam

Apache Beam поддерживает прием данных из широкого спектра источников и форматов данных ( см. Ниже ). Эти возможности можно использовать для создания пользовательских компонентов ExampleGen для TFX, что демонстрируется некоторыми существующими компонентами ExampleGen ( см. Ниже ).

Как использовать компонент ExampleGen

Для поддерживаемых источников данных (в настоящее время файлы CSV, файлы TFRecord с tf.Example , tf.SequenceExample и форматом proto, а также результаты запросов BigQuery) компонент конвейера ExampleGen может использоваться непосредственно при развертывании и требует небольшой настройки. Например:

example_gen = CsvExampleGen(input_base='data_root')

или, как показано ниже, для прямого импорта внешнего TFRecord с tf.Example :

example_gen = ImportExampleGen(input_base=path_to_tfrecord_dir)

Диапазон, версия и разделение

Span - это группа обучающих примеров. Если ваши данные хранятся в файловой системе, каждый Span может храниться в отдельном каталоге. Семантика Span жестко не запрограммирована в TFX; Интервал может соответствовать дню данных, часу данных или любой другой группировке, которая имеет значение для вашей задачи.

Каждый диапазон может содержать несколько версий данных. В качестве примера: если вы удалите несколько примеров из диапазона для очистки данных низкого качества, это может привести к созданию новой версии этого диапазона. По умолчанию компоненты TFX работают с последней версией в рамках диапазона.

Каждую версию в рамках диапазона можно дополнительно разделить на несколько разделов. Наиболее распространенный вариант разделения диапазона - разделение его на обучающие и оценочные данные.

Пролетные и разделенные участки

Пользовательское разделение ввода / вывода

Чтобы настроить соотношение разделения train / eval, которое будет выводить ExampleGen, установите output_config для компонента ExampleGen. Например:

# Input has a single split 'input_dir/*'.
# Output 2 splits: train:eval=3:1.
output = proto.Output(
             split_config=example_gen_pb2.SplitConfig(splits=[
                 proto.SplitConfig.Split(name='train', hash_buckets=3),
                 proto.SplitConfig.Split(name='eval', hash_buckets=1)
             ]))
example_gen = CsvExampleGen(input_base=input_dir, output_config=output)

Обратите внимание, как в этом hash_buckets были установлены hash_buckets .

Для источника ввода, который уже был разделен, установите input_config для компонента ExampleGen:


# Input train split is 'input_dir/train/*', eval split is 'input_dir/eval/*'.
# Output splits are generated one-to-one mapping from input splits.
input = proto.Input(splits=[
                example_gen_pb2.Input.Split(name='train', pattern='train/*'),
                example_gen_pb2.Input.Split(name='eval', pattern='eval/*')
            ])
example_gen = CsvExampleGen(input_base=input_dir, input_config=input)

Для примера gen на основе файла (например, CsvExampleGen и ImportExampleGen) pattern - это pattern относительного файла glob, который сопоставляется с входными файлами с корневым каталогом, заданным входным базовым путем. Для примера генерации запросов (например, BigQueryExampleGen, PrestoExampleGen) pattern - это SQL-запрос.

По умолчанию весь базовый каталог входных данных обрабатывается как единое разделение входных данных, а разделение выходных данных train и eval создается с соотношением 2: 1.

Пожалуйста, обратитесь к proto / example_gen.proto для конфигурации разделения ввода и вывода ExampleGen. И обратитесь к руководству по нисходящим компонентам для использования пользовательских разделений нисходящего потока.

Метод разделения

При использовании hash_buckets разделения hash_buckets вместо всей записи можно использовать функцию разделения примеров. Если функция присутствует, ExampleGen будет использовать ее отпечаток в качестве ключа раздела.

Эта функция может использоваться для поддержания стабильного разделения по отношению к определенным свойствам примеров: например, пользователь всегда будет помещен в один и тот же разделение, если в качестве имени функции раздела был выбран «user_id».

Интерпретация того, что означает «функция» и как сопоставить «функцию» с указанным именем, зависит от реализации ExampleGen и типа примеров.

Для готовых реализаций ExampleGen:

  • Если он генерирует tf.Example, то «функция» означает запись в tf.Example.features.feature.
  • Если он генерирует tf.SequenceExample, то «функция» означает запись в tf.SequenceExample.context.feature.
  • Поддерживаются только функции int64 и bytes.

В следующих случаях ExampleGen выдает ошибки выполнения:

  • Указанное имя функции не существует в примере.
  • Пустая функция: tf.train.Feature() .
  • Неподдерживаемые типы функций, например, плавающие.

Чтобы вывести разделение train / eval на основе функции в примерах, установите output_config для компонента ExampleGen. Например:

# Input has a single split 'input_dir/*'.
# Output 2 splits based on 'user_id' features: train:eval=3:1.
output = proto.Output(
             split_config=proto.SplitConfig(splits=[
                 proto.SplitConfig.Split(name='train', hash_buckets=3),
                 proto.SplitConfig.Split(name='eval', hash_buckets=1)
             ],
             partition_feature_name='user_id'))
example_gen = CsvExampleGen(input_base=input_dir, output_config=output)

Обратите внимание , как partition_feature_name был установлен в этом примере.

Охватывать

Интервал можно получить, используя спецификацию '{SPAN}' во входном шаблоне глобуса :

  • Эта спецификация сопоставляет цифры и отображает данные в соответствующие номера SPAN. Например, «data_ {SPAN} - *. Tfrecord» будет собирать файлы типа «data_12-a.tfrecord», «date_12-b.tfrecord».
  • При желании в этой спецификации можно указать ширину целых чисел при отображении. Например, «data_ {SPAN: 2} .file» сопоставляется с такими файлами, как «data_02.file» и «data_27.file» (как входные данные для Span-2 и Span-27 соответственно), но не сопоставляется с «data_1». file 'и' data_123.file '.
  • Если спецификация SPAN отсутствует, предполагается, что она всегда равна «0».
  • Если указан SPAN, конвейер обработает последний диапазон и сохранит номер диапазона в метаданных.

Например, предположим, что есть входные данные:

  • '/ tmp / span-1 / поезд / данные'
  • '/ tmp / span-1 / eval / данные'
  • '/ tmp / span-2 / поезд / данные'
  • '/ tmp / span-2 / eval / данные'

и входная конфигурация показана ниже:

splits {
  name: 'train'
  pattern: 'span-{SPAN}/train/*'
}
splits {
  name: 'eval'
  pattern: 'span-{SPAN}/eval/*'
}

при запуске конвейера он будет обрабатывать:

  • '/ tmp / span-2 / train / data' как разделение поездов
  • '/ tmp / span-2 / eval / data' как разделение на eval

с номером диапазона как «2». Если позже '/ tmp / span-3 / ...' будут готовы, просто снова запустите конвейер, и он возьмет диапазон '3' для обработки. Ниже показан пример кода для использования спецификации диапазона:

input = proto.Input(splits=[
                proto.Input.Split(name='train',
                                            pattern='span-{SPAN}/train/*'),
                proto.Input.Split(name='eval',
                                            pattern='span-{SPAN}/eval/*')
            ])
example_gen = CsvExampleGen(input_base='/tmp', input_config=input)

Получить определенный диапазон можно с помощью RangeConfig, который подробно описан ниже.

Дата

Если ваш источник данных организован в файловой системе по дате, TFX поддерживает сопоставление дат напрямую с номерами диапазона. Есть три спецификации для отображения отображения дат в промежутки: {ГГГГ}, {ММ} и {ДД}:

  • Все три спецификации должны присутствовать во входном шаблоне глобуса, если таковой указан:
  • Можно указать либо спецификацию {SPAN}, либо этот набор спецификаций даты.
  • Вычисляется календарная дата с годом от YYYY, месяц с MM и день месяца с DD, затем число диапазона вычисляется как количество дней с эпохи unix (т. Е. 1970-01-01). Например, 'log- {YYYY} {MM} {DD} .data' соответствует файлу 'log-19700101.data' и использует его как входные данные для Span-0, а 'log-20170101.data' как входные данные для Спан-17167.
  • Если указан этот набор спецификаций даты, конвейер обработает самую последнюю дату и сохранит соответствующий номер диапазона в метаданных.

Например, предположим, что есть входные данные, упорядоченные по календарной дате:

  • '/ tmp / 1970-01-02 / поезд / данные'
  • '/ tmp / 1970-01-02 / eval / data'
  • '/ tmp / 1970-01-03 / поезд / данные'
  • '/ tmp / 1970-01-03 / eval / data'

и входная конфигурация показана ниже:

splits {
  name: 'train'
  pattern: '{YYYY}-{MM}-{DD}/train/*'
}
splits {
  name: 'eval'
  pattern: '{YYYY}-{MM}-{DD}/eval/*'
}

при запуске конвейера он будет обрабатывать:

  • '/ tmp / 1970-01-03 / train / data' как разделенный поезд
  • '/ tmp / 1970-01-03 / eval / data' как разделение на eval

с номером диапазона как «2». Если позже '/ tmp / 1970-01-04 / ...' будут готовы, просто снова запустите конвейер, и он возьмет интервал '3' для обработки. Ниже показан пример кода для использования спецификации даты:

input = proto.Input(splits=[
                proto.Input.Split(name='train',
                                            pattern='{YYYY}-{MM}-{DD}/train/*'),
                proto.Input.Split(name='eval',
                                            pattern='{YYYY}-{MM}-{DD}/eval/*')
            ])
example_gen = CsvExampleGen(input_base='/tmp', input_config=input)

Версия

Версию можно получить, используя спецификацию '{VERSION}' во входном шаблоне глобуса :

  • Эта спецификация сопоставляет цифры и сопоставляет данные с соответствующими номерами ВЕРСИИ под SPAN. Обратите внимание, что спецификацию версии можно использовать в сочетании со спецификацией Span или Date.
  • Эта спецификация также может быть дополнительно указана с шириной так же, как и в спецификации SPAN. например, "span- {SPAN} / version- {VERSION: 4} / data- *".
  • Если спецификация VERSION отсутствует, для версии устанавливается значение None.
  • Если указаны оба параметра SPAN и VERSION, конвейер обработает последнюю версию для последнего диапазона и сохранит номер версии в метаданных.
  • Если указана VERSION, но не SPAN (или дата), будет выдана ошибка.

Например, предположим, что есть входные данные:

  • '/ tmp / span-1 / ver-1 / поезд / данные'
  • '/ tmp / span-1 / ver-1 / eval / data'
  • '/ tmp / span-2 / ver-1 / поезд / данные'
  • '/ tmp / span-2 / ver-1 / eval / данные'
  • '/ tmp / span-2 / ver-2 / поезд / данные'
  • '/ tmp / span-2 / ver-2 / eval / данные'

и входная конфигурация показана ниже:

splits {
  name: 'train'
  pattern: 'span-{SPAN}/ver-{VERSION}/train/*'
}
splits {
  name: 'eval'
  pattern: 'span-{SPAN}/ver-{VERSION}/eval/*'
}

при запуске конвейера он будет обрабатывать:

  • '/ tmp / span-2 / ver-2 / train / data' как разделенный поезд
  • '/ tmp / span-2 / ver-2 / eval / data' как разделение на eval

с номером диапазона как «2» и номером версии как «2». Если позже '/ tmp / span-2 / ver-3 / ...' будут готовы, просто снова запустите конвейер, и он возьмет диапазон '2' и версию '3' для обработки. Ниже показан пример кода для использования спецификации версии:

input = proto.Input(splits=[
                proto.Input.Split(name='train',
                                            pattern='span-{SPAN}/ver-{VERSION}/train/*'),
                proto.Input.Split(name='eval',
                                            pattern='span-{SPAN}/ver-{VERSION}/eval/*')
            ])
example_gen = CsvExampleGen(input_base='/tmp', input_config=input)

Конфигурация диапазона

TFX поддерживает извлечение и обработку определенного диапазона в файловом ExampleGen, используя конфигурацию диапазона, абстрактную конфигурацию, используемую для описания диапазонов для различных объектов TFX. Чтобы получить определенный диапазон, установите range_config для файлового компонента ExampleGen. Например, предположим, что есть входные данные:

  • '/ tmp / span-01 / поезд / данные'
  • '/ tmp / span-01 / eval / данные'
  • '/ tmp / span-02 / поезд / данные'
  • '/ tmp / span-02 / eval / данные'

Чтобы специально извлекать и обрабатывать данные с диапазоном '1', мы указываем конфигурацию диапазона в дополнение к входной конфигурации. Обратите внимание, что ExampleGen поддерживает только статические диапазоны с одним диапазоном (чтобы указать обработку определенных отдельных диапазонов). Таким образом, для StaticRange start_span_number должен равняться end_span_number. Используя предоставленный диапазон и информацию о ширине диапазона (если предоставлена) для заполнения нулями, ExampleGen заменит спецификацию SPAN в предоставленных шаблонах разделения на желаемый номер диапазона. Пример использования показан ниже:

# In cases where files have zero-padding, the width modifier in SPAN spec is
# required so TFX can correctly substitute spec with zero-padded span number.
input = proto.Input(splits=[
                proto.Input.Split(name='train',
                                            pattern='span-{SPAN:2}/train/*'),
                proto.Input.Split(name='eval',
                                            pattern='span-{SPAN:2}/eval/*')
            ])
# Specify the span number to be processed here using StaticRange.
range = proto.RangeConfig(
                static_range=proto.StaticRange(
                        start_span_number=1, end_span_number=1)
            )

# After substitution, the train and eval split patterns will be
# 'input_dir/span-01/train/*' and 'input_dir/span-01/eval/*', respectively.
example_gen = CsvExampleGen(input_base=input_dir, input_config=input,
                            range_config=range)

Конфигурация диапазона также может использоваться для обработки определенных дат, если спецификация даты используется вместо спецификации SPAN. Например, предположим, что есть входные данные, упорядоченные по календарной дате:

  • '/ tmp / 1970-01-02 / поезд / данные'
  • '/ tmp / 1970-01-02 / eval / data'
  • '/ tmp / 1970-01-03 / поезд / данные'
  • '/ tmp / 1970-01-03 / eval / data'

Чтобы получить и обработать данные 2 января 1970 г., мы делаем следующее:

from  tfx.components.example_gen import utils

input = proto.Input(splits=[
                proto.Input.Split(name='train',
                                            pattern='{YYYY}-{MM}-{DD}/train/*'),
                proto.Input.Split(name='eval',
                                            pattern='{YYYY}-{MM}-{DD}/eval/*')
            ])
# Specify date to be converted to span number to be processed using StaticRange.
span = utils.date_to_span_number(1970, 1, 2)
range = proto.RangeConfig(
                static_range=range_config_pb2.StaticRange(
                        start_span_number=span, end_span_number=span)
            )

# After substitution, the train and eval split patterns will be
# 'input_dir/1970-01-02/train/*' and 'input_dir/1970-01-02/eval/*',
# respectively.
example_gen = CsvExampleGen(input_base=input_dir, input_config=input,
                            range_config=range)

Пользовательский ExampleGen

Если доступные в настоящее время компоненты ExampleGen не соответствуют вашим потребностям, вы можете создать собственный ExampleGen, который позволит вам читать из разных источников данных или в разных форматах данных.

Настройка примера на основе файлов (экспериментальная)

Во-первых, расширьте BaseExampleGenExecutor с помощью настраиваемого Beam PTransform, который обеспечивает преобразование из вашего разделения ввода train / eval в примеры TF. Например, исполнитель CsvExampleGen обеспечивает преобразование из входного разбиения CSV в примеры TF.

Затем создайте компонент с указанным выше исполнителем, как это сделано в компоненте CsvExampleGen . В качестве альтернативы можно передать настраиваемого исполнителя в стандартный компонент ExampleGen, как показано ниже.

from tfx.components.base import executor_spec
from tfx.components.example_gen.csv_example_gen import executor

example_gen = FileBasedExampleGen(
    input_base=os.path.join(base_dir, 'data/simple'),
    custom_executor_spec=executor_spec.ExecutorClassSpec(executor.Executor))

Теперь мы также поддерживаем чтение файлов Avro и Parquet с помощью этого метода .

Дополнительные форматы данных

Apache Beam поддерживает чтение ряда дополнительных форматов данных . через Beam I / O Transforms. Вы можете создавать собственные компоненты ExampleGen, используя Beam I / O Transforms, используя шаблон, аналогичный примеру Avro.

  return (pipeline
          | 'ReadFromAvro' >> beam.io.ReadFromAvro(avro_pattern)
          | 'ToTFExample' >> beam.Map(utils.dict_to_example))

На момент написания этой статьи поддерживаемые в настоящее время форматы и источники данных для Beam Python SDK включают:

  • Amazon S3
  • Apache Avro
  • Apache Hadoop
  • Апач Кафка
  • Паркет Apache
  • Google Cloud BigQuery
  • Google Cloud BigTable
  • Google Cloud Datastore
  • Google Cloud Pub / Sub
  • Облачное хранилище Google (GCS)
  • MongoDB

Последний список можно найти в документации по Beam .

Настройка примера на основе запроса (экспериментальная)

Во-первых, расширите BaseExampleGenExecutor с помощью настраиваемого Beam PTransform, который читает из внешнего источника данных. Затем создайте простой компонент, расширив QueryBasedExampleGen.

Это может потребовать, а может и не потребовать дополнительных настроек подключения. Например, программа- исполнитель BigQuery читает, используя коннектор beam.io по умолчанию, который абстрагирует детали конфигурации подключения. Для исполнителя Presto требуется пользовательский PTransform Beam и пользовательский protobuf конфигурации подключения в качестве входных данных.

Если для настраиваемого компонента ExampleGen требуется конфигурация подключения, создайте новый protobuf и передайте его через custom_config, который теперь является необязательным параметром выполнения. Ниже приведен пример использования настроенного компонента.

from tfx.examples.custom_components.presto_example_gen.proto import presto_config_pb2
from tfx.examples.custom_components.presto_example_gen.presto_component.component import PrestoExampleGen

presto_config = presto_config_pb2.PrestoConnConfig(host='localhost', port=8080)
example_gen = PrestoExampleGen(presto_config, query='SELECT * FROM chicago_taxi_trips')

Компоненты нисходящего потока ExampleGen

Пользовательская конфигурация разделения поддерживается для последующих компонентов.

СтатистикаGen

По умолчанию выполняется генерация статистики для всех сплитов.

Чтобы исключить какие-либо разбиения, установите exclude_splits для компонента StatisticsGen. Например:

# Exclude the 'eval' split.
statistics_gen = StatisticsGen(
             examples=example_gen.outputs['examples'],
             exclude_splits=['eval'])

SchemaGen

По умолчанию схема создается на основе всех разбиений.

Чтобы исключить любые разбиения, установите exclude_splits для компонента SchemaGen. Например:

# Exclude the 'eval' split.
schema_gen = SchemaGen(
             statistics=statistics_gen.outputs['statistics'],
             exclude_splits=['eval'])

ExampleValidator

Поведение по умолчанию - проверять статистику всех разбиений на примерах ввода на соответствие схеме.

Чтобы исключить любые разделения, установите exclude_splits для компонента ExampleValidator. Например:

# Exclude the 'eval' split.
example_validator = ExampleValidator(
             statistics=statistics_gen.outputs['statistics'],
             schema=schema_gen.outputs['schema'],
             exclude_splits=['eval'])

Преобразовать

Поведение по умолчанию - анализировать и генерировать метаданные из «поезда» разбиения и преобразовывать все разбиения.

Чтобы указать разбиения анализа и разбиения преобразования, установите splits_config для компонента Transform. Например:

# Analyze the 'train' split and transform all splits.
transform = Transform(
      examples=example_gen.outputs['examples'],
      schema=schema_gen.outputs['schema'],
      module_file=_taxi_module_file,
      splits_config=proto.SplitsConfig(analyze=['train'],
                                               transform=['train', 'eval']))

Тренер и тюнер

Поведение по умолчанию - тренировка на сплите train и оценка на сплите eval.

Чтобы указать разделение поезда и оценить разделение, установите параметры train_args и eval_args для компонента Trainer. Например:

# Train on the 'train' split and evaluate on the 'eval' split.
Trainer = Trainer(
      module_file=_taxi_module_file,
      examples=transform.outputs['transformed_examples'],
      schema=schema_gen.outputs['schema'],
      transform_graph=transform.outputs['transform_graph'],
      train_args=proto.TrainArgs(splits=['train'], num_steps=10000),
      eval_args=proto.EvalArgs(splits=['eval'], num_steps=5000))

Оценщик

Поведение по умолчанию - предоставление метрик, вычисленных на разбиении eval.

Чтобы вычислить статистику оценки для пользовательских разделений, установите example_splits для компонента Evaluator. Например:

# Compute metrics on the 'eval1' split and the 'eval2' split.
Trainer = Evaluator(
      examples=example_gen.outputs['examples'],
      model=trainer.outputs['model'],
      example_splits=['eval1', 'eval2'])

Дополнительные сведения доступны в справочнике по API CsvExampleGen, справочнике по API FileBasedExampleGen и справочнике по API ImportExampleGen .