Присоединяйтесь к TensorFlow на Google I/O, 11-12 мая Зарегистрируйтесь сейчас

Компонент конвейера ExampleGen TFX

Компонент ExampleGen TFX Pipeline принимает данные в конвейеры TFX. Он использует внешние файлы/сервисы для создания примеров, которые будут считываться другими компонентами TFX. Он также обеспечивает согласованный и настраиваемый раздел и перемешивает набор данных в соответствии с передовой практикой машинного обучения.

  • Потребляет: данные из внешних источников данных, таких как CSV, TFRecord , Avro, Parquet и BigQuery.
  • Выдает: записи tf.Example , записи tf.SequenceExample или протоформат, в зависимости от формата полезной нагрузки.

ExampleGen и другие компоненты

ExampleGen предоставляет данные компонентам, использующим библиотеку проверки данных TensorFlow, таким как SchemaGen , StatisticsGen и Example Validator . Он также предоставляет данные для Transform , который использует библиотеку TensorFlow Transform , и, в конечном счете, для целей развертывания во время логического вывода.

Источники данных и форматы

В настоящее время стандартная установка TFX включает полные компоненты ExampleGen для следующих источников данных и форматов:

Также доступны пользовательские исполнители, которые позволяют разрабатывать компоненты ExampleGen для следующих источников данных и форматов:

См. примеры использования в исходном коде и это обсуждение для получения дополнительной информации о том, как использовать и разрабатывать собственные исполнители.

Кроме того, эти источники данных и форматы доступны в качестве примеров пользовательских компонентов :

Прием форматов данных, поддерживаемых Apache Beam

Apache Beam поддерживает прием данных из широкого спектра источников данных и форматов ( см. ниже ). Эти возможности можно использовать для создания пользовательских компонентов ExampleGen для TFX, что демонстрируют некоторые существующие компоненты ExampleGen ( см. ниже ).

Как использовать компонент ExampleGen

Для поддерживаемых источников данных (в настоящее время файлы CSV, файлы TFRecord с tf.Example , tf.SequenceExample и proto, а также результаты запросов BigQuery) компонент конвейера ExampleGen можно использовать непосредственно при развертывании и требует небольшой настройки. Например:

example_gen = CsvExampleGen(input_base='data_root')

или, как показано ниже, для прямого импорта внешнего TFRecord с tf.Example :

example_gen = ImportExampleGen(input_base=path_to_tfrecord_dir)

Диапазон, версия и разделение

Span — это группа обучающих примеров. Если ваши данные сохраняются в файловой системе, каждый Span может храниться в отдельном каталоге. Семантика Span не жестко запрограммирована в TFX; Span может соответствовать дню данных, часу данных или любой другой группе, значимой для вашей задачи.

Каждый диапазон может содержать несколько версий данных. Чтобы привести пример, если вы удалите некоторые примеры из Span, чтобы очистить данные низкого качества, это может привести к новой версии этого Span. По умолчанию компоненты TFX работают с последней версией в диапазоне.

Каждая версия в пределах диапазона может быть далее подразделена на несколько разделов. Наиболее распространенным вариантом использования для разделения Span является разделение его на данные обучения и оценки.

Пролеты и разделения

Пользовательское разделение ввода/вывода

Чтобы настроить соотношение разделения train/eval, которое будет выводить ExampleGen, установите output_config для компонента ExampleGen. Например:

# Input has a single split 'input_dir/*'.
# Output 2 splits: train:eval=3:1.
output = proto.Output(
             split_config=example_gen_pb2.SplitConfig(splits=[
                 proto.SplitConfig.Split(name='train', hash_buckets=3),
                 proto.SplitConfig.Split(name='eval', hash_buckets=1)
             ]))
example_gen = CsvExampleGen(input_base=input_dir, output_config=output)

Обратите внимание, как в этом примере были установлены hash_buckets .

Для источника ввода, который уже был разделен, установите input_config для компонента ExampleGen:


# Input train split is 'input_dir/train/*', eval split is 'input_dir/eval/*'.
# Output splits are generated one-to-one mapping from input splits.
input = proto.Input(splits=[
                example_gen_pb2.Input.Split(name='train', pattern='train/*'),
                example_gen_pb2.Input.Split(name='eval', pattern='eval/*')
            ])
example_gen = CsvExampleGen(input_base=input_dir, input_config=input)

Для генерации примеров на основе файлов (например, CsvExampleGen и ImportExampleGen) pattern представляет собой относительный шаблон файла глобуса, который сопоставляется с входными файлами с корневым каталогом, заданным входным базовым путем. Для генерации примеров на основе запросов (например, BigQueryExampleGen, PrestoExampleGen) pattern представляет собой SQL-запрос.

По умолчанию весь входной базовый каталог обрабатывается как один входной разделитель, а выходной раздел поезда и eval генерируется с соотношением 2:1.

Пожалуйста, обратитесь к proto/example_gen.proto для конфигурации разделения ввода и вывода ExampleGen. И обратитесь к руководству по нижестоящим компонентам для использования пользовательских разделений ниже по течению.

Метод разделения

При использовании метода разделения hash_buckets вместо всей записи можно использовать функцию разделения примеров. Если функция присутствует, ExampleGen будет использовать отпечаток этой функции в качестве ключа раздела.

Эту функцию можно использовать для поддержания стабильного разделения по отношению к определенным свойствам примеров: например, пользователь всегда будет помещаться в одно и то же разделение, если в качестве имени функции раздела было выбрано «user_id».

Интерпретация того, что означает «функция» и как сопоставить «функция» с указанным именем, зависит от реализации ExampleGen и типа примеров.

Для готовых реализаций ExampleGen:

  • Если он генерирует tf.Example, то «функция» означает запись в tf.Example.features.feature.
  • Если он генерирует tf.SequenceExample, то «функция» означает запись в tf.SequenceExample.context.feature.
  • Поддерживаются только функции int64 и bytes.

В следующих случаях ExampleGen выдает ошибки времени выполнения:

  • Указанное имя функции не существует в примере.
  • Пустая функция: tf.train.Feature() .
  • Неподдерживаемые типы элементов, например плавающие элементы.

Чтобы вывести разделение train/eval на основе функции в примерах, установите output_config для компонента ExampleGen. Например:

# Input has a single split 'input_dir/*'.
# Output 2 splits based on 'user_id' features: train:eval=3:1.
output = proto.Output(
             split_config=proto.SplitConfig(splits=[
                 proto.SplitConfig.Split(name='train', hash_buckets=3),
                 proto.SplitConfig.Split(name='eval', hash_buckets=1)
             ],
             partition_feature_name='user_id'))
example_gen = CsvExampleGen(input_base=input_dir, output_config=output)

Обратите внимание, как в этом примере задано partition_feature_name .

Охватывать

Span можно получить, используя спецификацию {SPAN} во входном шаблоне glob :

  • Эта спецификация сопоставляет цифры и сопоставляет данные с соответствующими номерами SPAN. Например, «data_{SPAN}-*.tfrecord» соберет такие файлы, как «data_12-a.tfrecord», «date_12-b.tfrecord».
  • При желании эту спецификацию можно указать с шириной целых чисел при отображении. Например, «data_{SPAN:2}.file» сопоставляется с такими файлами, как «data_02.file» и «data_27.file» (в качестве входных данных для Span-2 и Span-27 соответственно), но не сопоставляется с «data_1». файл» или «data_123.file».
  • Когда спецификация SPAN отсутствует, предполагается, что она всегда равна Span '0'.
  • Если указан SPAN, конвейер будет обрабатывать последний диапазон и сохранять номер диапазона в метаданных.

Например, предположим, что есть входные данные:

  • '/tmp/span-1/поезд/данные'
  • '/tmp/span-1/eval/данные'
  • '/tmp/span-2/поезд/данные'
  • '/tmp/span-2/eval/данные'

и входная конфигурация показана ниже:

splits {
  name: 'train'
  pattern: 'span-{SPAN}/train/*'
}
splits {
  name: 'eval'
  pattern: 'span-{SPAN}/eval/*'
}

при запуске конвейера он будет обрабатывать:

  • '/tmp/span-2/train/data' как разделение поезда
  • '/tmp/span-2/eval/data' как разделение eval

с номером пролета как «2». Если позже '/tmp/span-3/...' будет готов, просто снова запустите конвейер, и он выберет для обработки диапазон '3'. Ниже показан пример кода для использования спецификации span:

input = proto.Input(splits=[
                proto.Input.Split(name='train',
                                            pattern='span-{SPAN}/train/*'),
                proto.Input.Split(name='eval',
                                            pattern='span-{SPAN}/eval/*')
            ])
example_gen = CsvExampleGen(input_base='/tmp', input_config=input)

Получить определенный диапазон можно с помощью RangeConfig, подробно описанного ниже.

Дата

Если ваш источник данных организован в файловой системе по дате, TFX поддерживает сопоставление дат непосредственно с числами диапазонов. Существует три спецификации для представления сопоставления дат с интервалами: {ГГГГ}, {ММ} и {ДД}:

  • Три спецификации должны полностью присутствовать во входном шаблоне глобуса, если они указаны:
  • Можно указать либо спецификацию {SPAN}, либо этот набор спецификаций даты.
  • Рассчитывается календарная дата с годом от ГГГГ, месяцем от ММ и днем ​​месяца от ДД, затем число интервала рассчитывается как количество дней с эпохи unix (т.е. 1970-01-01). Например, "log-{ГГГГ}{ММ}{ДД}.data" соответствует файлу "log-19700101.data" и использует его в качестве входных данных для Span-0, а "log-20170101.data" в качестве входных данных для Пролет-17167.
  • Если указан этот набор спецификаций даты, конвейер будет обрабатывать самую последнюю дату и сохранять соответствующий номер диапазона в метаданных.

Например, предположим, что есть входные данные, упорядоченные по календарной дате:

  • '/tmp/1970-01-02/поезд/данные'
  • '/tmp/1970-01-02/eval/данные'
  • '/tmp/1970-01-03/поезд/данные'
  • '/tmp/1970-01-03/eval/данные'

и входная конфигурация показана ниже:

splits {
  name: 'train'
  pattern: '{YYYY}-{MM}-{DD}/train/*'
}
splits {
  name: 'eval'
  pattern: '{YYYY}-{MM}-{DD}/eval/*'
}

при запуске конвейера он будет обрабатывать:

  • '/tmp/1970-01-03/train/data' как разделение поезда
  • '/tmp/1970-01-03/eval/data' как разделение eval

с номером пролета как «2». Если позже '/tmp/1970-01-04/...' будет готово, просто снова запустите конвейер, и он выберет диапазон '3' для обработки. Ниже показан пример кода для использования спецификации даты:

input = proto.Input(splits=[
                proto.Input.Split(name='train',
                                            pattern='{YYYY}-{MM}-{DD}/train/*'),
                proto.Input.Split(name='eval',
                                            pattern='{YYYY}-{MM}-{DD}/eval/*')
            ])
example_gen = CsvExampleGen(input_base='/tmp', input_config=input)

Версия

Версию можно получить, используя спецификацию {VERSION} во входном шаблоне glob :

  • Эта спецификация сопоставляет цифры и сопоставляет данные с соответствующими номерами ВЕРСИИ в SPAN. Обратите внимание, что спецификацию Version можно использовать в сочетании со спецификацией Span или Date.
  • Эта спецификация также может быть опционально указана с шириной так же, как спецификация SPAN. например, 'span-{SPAN}/версия-{VERSION:4}/data-*'.
  • Если спецификация VERSION отсутствует, для версии устанавливается значение None.
  • Если указаны оба SPAN и VERSION, конвейер будет обрабатывать последнюю версию для последнего диапазона и сохранит номер версии в метаданных.
  • Если указана VERSION, но не SPAN (или спецификация даты), будет выдана ошибка.

Например, предположим, что есть входные данные:

  • '/tmp/span-1/ver-1/train/data'
  • '/tmp/span-1/ver-1/eval/данные'
  • '/tmp/span-2/ver-1/train/data'
  • '/tmp/span-2/ver-1/eval/данные'
  • '/tmp/span-2/ver-2/train/data'
  • '/tmp/span-2/ver-2/eval/данные'

и входная конфигурация показана ниже:

splits {
  name: 'train'
  pattern: 'span-{SPAN}/ver-{VERSION}/train/*'
}
splits {
  name: 'eval'
  pattern: 'span-{SPAN}/ver-{VERSION}/eval/*'
}

при запуске конвейера он будет обрабатывать:

  • '/tmp/span-2/ver-2/train/data' как разделение поезда
  • '/tmp/span-2/ver-2/eval/data' как разделение eval

с номером пролета как «2» и номером версии как «2». Если позднее «/tmp/span-2/ver-3/...» будут готовы, просто снова запустите конвейер, и он выберет диапазон «2» и версию «3» для обработки. Ниже показан пример кода для использования спецификации версии:

input = proto.Input(splits=[
                proto.Input.Split(name='train',
                                            pattern='span-{SPAN}/ver-{VERSION}/train/*'),
                proto.Input.Split(name='eval',
                                            pattern='span-{SPAN}/ver-{VERSION}/eval/*')
            ])
example_gen = CsvExampleGen(input_base='/tmp', input_config=input)

Конфигурация диапазона

TFX поддерживает извлечение и обработку определенного диапазона в файле ExampleGen с использованием конфигурации диапазона, абстрактной конфигурации, используемой для описания диапазонов для различных объектов TFX. Чтобы получить конкретный диапазон, установите range_config для файлового компонента ExampleGen. Например, предположим, что есть входные данные:

  • '/tmp/span-01/поезд/данные'
  • '/tmp/span-01/eval/данные'
  • '/tmp/span-02/поезд/данные'
  • '/tmp/span-02/eval/данные'

Чтобы специально извлекать и обрабатывать данные с диапазоном «1», мы указываем конфигурацию диапазона в дополнение к входной конфигурации. Обратите внимание, что ExampleGen поддерживает только статические диапазоны с одним пролетом (чтобы указать обработку конкретных отдельных отрезков). Таким образом, для StaticRange start_span_number должен быть равен end_span_number. Используя предоставленный диапазон и информацию о ширине диапазона (если она предоставлена) для заполнения нулями, ExampleGen заменит спецификацию SPAN в предоставленных шаблонах разделения на желаемый номер диапазона. Пример использования показан ниже:

# In cases where files have zero-padding, the width modifier in SPAN spec is
# required so TFX can correctly substitute spec with zero-padded span number.
input = proto.Input(splits=[
                proto.Input.Split(name='train',
                                            pattern='span-{SPAN:2}/train/*'),
                proto.Input.Split(name='eval',
                                            pattern='span-{SPAN:2}/eval/*')
            ])
# Specify the span number to be processed here using StaticRange.
range = proto.RangeConfig(
                static_range=proto.StaticRange(
                        start_span_number=1, end_span_number=1)
            )

# After substitution, the train and eval split patterns will be
# 'input_dir/span-01/train/*' and 'input_dir/span-01/eval/*', respectively.
example_gen = CsvExampleGen(input_base=input_dir, input_config=input,
                            range_config=range)

Конфигурацию диапазона также можно использовать для обработки конкретных дат, если вместо спецификации SPAN используется спецификация даты. Например, предположим, что есть входные данные, упорядоченные по календарной дате:

  • '/tmp/1970-01-02/поезд/данные'
  • '/tmp/1970-01-02/eval/данные'
  • '/tmp/1970-01-03/поезд/данные'
  • '/tmp/1970-01-03/eval/данные'

Чтобы конкретно получить и обработать данные 2 января 1970 года, мы делаем следующее:

from  tfx.components.example_gen import utils

input = proto.Input(splits=[
                proto.Input.Split(name='train',
                                            pattern='{YYYY}-{MM}-{DD}/train/*'),
                proto.Input.Split(name='eval',
                                            pattern='{YYYY}-{MM}-{DD}/eval/*')
            ])
# Specify date to be converted to span number to be processed using StaticRange.
span = utils.date_to_span_number(1970, 1, 2)
range = proto.RangeConfig(
                static_range=range_config_pb2.StaticRange(
                        start_span_number=span, end_span_number=span)
            )

# After substitution, the train and eval split patterns will be
# 'input_dir/1970-01-02/train/*' and 'input_dir/1970-01-02/eval/*',
# respectively.
example_gen = CsvExampleGen(input_base=input_dir, input_config=input,
                            range_config=range)

Пользовательский ПримерГен

Если доступные в настоящее время компоненты ExampleGen не соответствуют вашим потребностям, вы можете создать собственный ExampleGen, который позволит вам читать из разных источников данных или в разных форматах данных.

Файловая настройка ExampleGen (экспериментальная)

Во-первых, расширьте BaseExampleGenExecutor с помощью пользовательского Beam PTransform, который обеспечивает преобразование вашего разделения ввода train/eval в примеры TF. Например, исполнитель CsvExampleGen обеспечивает преобразование входного CSV-раздела в примеры TF.

Затем создайте компонент с вышеуказанным исполнителем, как это сделано в компоненте CsvExampleGen . В качестве альтернативы передайте пользовательский исполнитель в стандартный компонент ExampleGen, как показано ниже.

from tfx.components.base import executor_spec
from tfx.components.example_gen.csv_example_gen import executor

example_gen = FileBasedExampleGen(
    input_base=os.path.join(base_dir, 'data/simple'),
    custom_executor_spec=executor_spec.ExecutorClassSpec(executor.Executor))

Теперь мы также поддерживаем чтение файлов Avro и Parquet с помощью этого метода .

Дополнительные форматы данных

Apache Beam поддерживает чтение ряда дополнительных форматов данных . через Beam I/O Transforms. Вы можете создавать собственные компоненты ExampleGen, используя Beam I/O Transforms, используя шаблон, аналогичный примеру Avro .

  return (pipeline
          | 'ReadFromAvro' >> beam.io.ReadFromAvro(avro_pattern)
          | 'ToTFExample' >> beam.Map(utils.dict_to_example))

На момент написания этой статьи поддерживаемые в настоящее время форматы и источники данных для Beam Python SDK включают:

  • Амазонка S3
  • Апач Авро
  • Апач Хадуп
  • Апач Кафка
  • Апач Паркет
  • Облако Google BigQuery
  • Облачная большая таблица Google
  • Облачное хранилище данных Google
  • Google Cloud Pub/Sub
  • Облачное хранилище Google (GCS)
  • MongoDB

Проверьте документы Beam для получения последнего списка.

Настройка ExampleGen на основе запросов (экспериментальная)

Во-первых, расширьте BaseExampleGenExecutor с помощью пользовательского Beam PTransform, который считывает данные из внешнего источника. Затем создайте простой компонент, расширив QueryBasedExampleGen.

Это может потребовать или не потребовать дополнительных конфигураций подключения. Например, исполнитель BigQuery читает с помощью коннектора beam.io по умолчанию, который абстрагирует детали конфигурации соединения. Исполнитель Presto требует в качестве входных данных пользовательского Beam PTransform и пользовательского protobuf конфигурации соединения .

Если для пользовательского компонента ExampleGen требуется конфигурация соединения, создайте новый protobuf и передайте его через custom_config, который теперь является необязательным параметром выполнения. Ниже приведен пример использования настроенного компонента.

from tfx.examples.custom_components.presto_example_gen.proto import presto_config_pb2
from tfx.examples.custom_components.presto_example_gen.presto_component.component import PrestoExampleGen

presto_config = presto_config_pb2.PrestoConnConfig(host='localhost', port=8080)
example_gen = PrestoExampleGen(presto_config, query='SELECT * FROM chicago_taxi_trips')

Компоненты ExampleGen Downstream

Пользовательская конфигурация разделения поддерживается для нижестоящих компонентов.

СтатистикаGen

По умолчанию выполняется генерация статистики для всех сплитов.

Чтобы исключить какие-либо разбиения, установите exclude_splits для компонента StatisticsGen. Например:

# Exclude the 'eval' split.
statistics_gen = StatisticsGen(
             examples=example_gen.outputs['examples'],
             exclude_splits=['eval'])

SchemaGen

Поведение по умолчанию заключается в создании схемы на основе всех разбиений.

Чтобы исключить любые разбиения, установите exclude_splits для компонента SchemaGen. Например:

# Exclude the 'eval' split.
schema_gen = SchemaGen(
             statistics=statistics_gen.outputs['statistics'],
             exclude_splits=['eval'])

ПримерВалидатор

Поведение по умолчанию заключается в проверке статистики всех разделений входных примеров по схеме.

Чтобы исключить любые разбиения, установите exclude_splits для компонента ExampleValidator. Например:

# Exclude the 'eval' split.
example_validator = ExampleValidator(
             statistics=statistics_gen.outputs['statistics'],
             schema=schema_gen.outputs['schema'],
             exclude_splits=['eval'])

Трансформировать

Поведение по умолчанию — анализ и создание метаданных из разделения «поезд» и преобразование всех разделений.

Чтобы указать расщепления для анализа и преобразования, установите splits_config для компонента Transform. Например:

# Analyze the 'train' split and transform all splits.
transform = Transform(
      examples=example_gen.outputs['examples'],
      schema=schema_gen.outputs['schema'],
      module_file=_taxi_module_file,
      splits_config=proto.SplitsConfig(analyze=['train'],
                                               transform=['train', 'eval']))

Тренер и тюнер

Поведение по умолчанию — обучение на разбиении «обучение» и оценка на разбиении «eval».

Чтобы указать разделение поезда и оценить разделение, установите train_args и eval_args для компонента Trainer. Например:

# Train on the 'train' split and evaluate on the 'eval' split.
Trainer = Trainer(
      module_file=_taxi_module_file,
      examples=transform.outputs['transformed_examples'],
      schema=schema_gen.outputs['schema'],
      transform_graph=transform.outputs['transform_graph'],
      train_args=proto.TrainArgs(splits=['train'], num_steps=10000),
      eval_args=proto.EvalArgs(splits=['eval'], num_steps=5000))

Оценщик

Поведение по умолчанию предоставляет метрики, вычисленные для разделения 'eval'.

Чтобы вычислить оценочную статистику для пользовательских сплитов, установите example_splits для компонента Evaluator. Например:

# Compute metrics on the 'eval1' split and the 'eval2' split.
evaluator = Evaluator(
      examples=example_gen.outputs['examples'],
      model=trainer.outputs['model'],
      example_splits=['eval1', 'eval2'])

Дополнительные сведения доступны в справочнике по API CsvExampleGen, справочнике по API FileBasedExampleGen и справочнике по API ImportExampleGen .