Dołącz do społeczności SIG TFX-Addons i pomóż ulepszyć TFX! Dołącz do dodatków SIG TFX

Komponent potoku TFX ExampleGen

Składnik ExampleGen TFX Pipeline pozyskuje dane do potoków TFX. Zużywa zewnętrzne pliki/usługi do generowania przykładów, które będą odczytywane przez inne komponenty TFX. Zapewnia również spójną i konfigurowalną partycję oraz tasuje zestaw danych zgodnie z najlepszymi praktykami ML.

  • Zużywa: dane z zewnętrznych źródeł danych, takich jak CSV, TFRecord , Avro, Parquet i BigQuery.
  • tf.Example : rekordy tf.Example rekordy tf.SequenceExample lub format proto, w zależności od formatu ładunku.

Przykładowa generacja i inne komponenty

ExampleGen dostarcza dane do komponentów, które korzystają z biblioteki TensorFlow Data Validation , takich jak SchemaGen , StatisticsGen i Example Validator . Dostarcza również dane do Transform , które wykorzystuje bibliotekę TensorFlow Transform , a ostatecznie do celów wdrożenia podczas wnioskowania.

Źródła danych i formaty

Obecnie standardowa instalacja TFX zawiera pełne komponenty ExampleGen dla tych źródeł danych i formatów:

Dostępne są również niestandardowe executory, które umożliwiają tworzenie komponentów ExampleGen dla tych źródeł i formatów danych:

Zobacz przykłady użycia w kodzie źródłowym i tej dyskusji, aby uzyskać więcej informacji na temat używania i rozwijania niestandardowych executorów.

Ponadto te źródła danych i formaty są dostępne jako przykłady komponentów niestandardowych :

Pozyskiwanie formatów danych obsługiwanych przez Apache Beam

Apache Beam obsługuje pozyskiwanie danych z szerokiego zakresu źródeł danych i formatów ( patrz poniżej ). Te możliwości można wykorzystać do tworzenia niestandardowych komponentów ExampleGen dla TFX, co jest demonstrowane przez niektóre istniejące komponenty ExampleGen ( patrz poniżej ).

Jak korzystać z komponentu ExampleGen

W przypadku obsługiwanych źródeł danych (obecnie pliki CSV, pliki TFRecord z tf.Example , tf.SequenceExample i proto oraz wyniki zapytań BigQuery) komponent potoku ExampleGen może być używany bezpośrednio we wdrożeniu i wymaga niewielkiego dostosowania. Na przykład:

example_gen = CsvExampleGen(input_base='data_root')

lub jak poniżej, aby importować zewnętrzny tf.Example bezpośrednio z tf.Example :

example_gen = ImportExampleGen(input_base=path_to_tfrecord_dir)

Rozpiętość, wersja i podział

Span to grupa przykładów szkoleniowych. Jeśli dane są utrwalone w systemie plików, każdy Span może być przechowywany w osobnym katalogu. Semantyka Span nie jest zakodowana na stałe w TFX; Span może odpowiadać jednemu dniu danych, godzinie danych lub dowolnej innej grupie, która ma znaczenie dla Twojego zadania.

Każdy Span może przechowywać wiele wersji danych. Na przykład, jeśli usuniesz kilka przykładów z Span, aby wyczyścić dane o niskiej jakości, może to spowodować powstanie nowej wersji tego Span. Domyślnie komponenty TFX działają na najnowszej wersji w ramach Span.

Każda wersja w ramach Span może być dalej podzielona na wiele Splitów. Najczęstszym przypadkiem użycia dzielenia Span jest podzielenie go na dane uczące i ewaluacyjne.

Przęsła i podziały

Niestandardowy podział wejścia/wyjścia

Aby dostosować stosunek podziału pociąg/oszacowanie, który będzie wyprowadzany w programie output_config , ustaw output_config dla komponentu ExampleGen. Na przykład:

# Input has a single split 'input_dir/*'.
# Output 2 splits: train:eval=3:1.
output = proto.Output(
             split_config=example_gen_pb2.SplitConfig(splits=[
                 proto.SplitConfig.Split(name='train', hash_buckets=3),
                 proto.SplitConfig.Split(name='eval', hash_buckets=1)
             ]))
example_gen = CsvExampleGen(input_base=input_dir, output_config=output)

Zwróć uwagę, jak w tym przykładzie ustawiono hash_buckets .

Dla źródła wejściowego, które zostało już podzielone, ustaw input_config dla komponentu ExampleGen:


# Input train split is 'input_dir/train/*', eval split is 'input_dir/eval/*'.
# Output splits are generated one-to-one mapping from input splits.
input = proto.Input(splits=[
                example_gen_pb2.Input.Split(name='train', pattern='train/*'),
                example_gen_pb2.Input.Split(name='eval', pattern='eval/*')
            ])
example_gen = CsvExampleGen(input_base=input_dir, input_config=input)

Dla przykładowego gen opartego na plikach (np. CsvExampleGen i ImportExampleGen), pattern jest względnym wzorcem pliku glob, który mapuje do plików wejściowych z katalogiem głównym podanym przez wejściową ścieżkę bazową. Dla przykładowego genu opartego na zapytaniu (np. BigQueryExampleGen, PrestoExampleGen) pattern jest zapytaniem SQL.

Domyślnie cały wejściowy dir bazowy jest traktowany jako pojedynczy podział wejściowy, a podział wyjściowy i wyjściowy jest generowany w stosunku 2:1.

Proszę odnieść się do proto/example_gen.proto, aby zapoznać się z konfiguracją podziału wejścia i wyjścia w ExampleGen. I zapoznaj się z przewodnikiem po dalszych komponentach, aby wykorzystać niestandardowe podziały.

Metoda dzielenia

hash_buckets metodę dzielenia hash_buckets , zamiast całego rekordu, można wykorzystać funkcję partycjonowania przykładów. Jeśli funkcja jest obecna, ExampleGen użyje odcisku palca tej funkcji jako klucza partycji.

Ta funkcja może być używana do utrzymania stabilnego podziału z pewnymi właściwościami przykładów: na przykład użytkownik zawsze będzie umieszczany w tym samym podziale, jeśli jako nazwę funkcji partycji wybrano „identyfikator_użytkownika”.

Interpretacja znaczenia „cechy” i sposobu dopasowania „cechy” do określonej nazwy zależy od implementacji ExampleGen i typu przykładów.

Dla gotowych implementacji ExampleGen:

  • Jeśli generuje tf.Example, to „feature” oznacza wpis w tf.Example.features.feature.
  • Jeśli generuje tf.SequenceExample, to „feature” oznacza wpis w tf.SequenceExample.context.feature.
  • Obsługiwane są tylko funkcje int64 i bytes.

W następujących przypadkach ExampleGen zgłasza błędy w czasie wykonywania:

  • Określona nazwa funkcji nie istnieje w przykładzie.
  • Pusta funkcja: tf.train.Feature() .
  • Nieobsługiwane typy funkcji, np. funkcje pływające.

Aby wyprowadzić podział pociągu/oceny na podstawie funkcji w przykładach, ustaw output_config dla komponentu ExampleGen. Na przykład:

# Input has a single split 'input_dir/*'.
# Output 2 splits based on 'user_id' features: train:eval=3:1.
output = proto.Output(
             split_config=proto.SplitConfig(splits=[
                 proto.SplitConfig.Split(name='train', hash_buckets=3),
                 proto.SplitConfig.Split(name='eval', hash_buckets=1)
             ],
             partition_feature_name='user_id'))
example_gen = CsvExampleGen(input_base=input_dir, output_config=output)

Wskazówki jak partition_feature_name została ustawiona w tym przykładzie.

Zakres

Span można pobrać za pomocą specyfikacji „{SPAN}” we wzorcu wejściowym glob :

  • Ta specyfikacja dopasowuje cyfry i mapuje dane na odpowiednie numery SPAN. Na przykład „data_{SPAN}-*.tfrecord” zbierze pliki takie jak „data_12-a.tfrecord”, „date_12-b.tfrecord”.
  • Opcjonalnie tę specyfikację można określić za pomocą szerokości liczb całkowitych podczas mapowania. Na przykład „data_{SPAN:2}.file” jest mapowany na pliki, takie jak „data_02.file” i „data_27.file” (jako dane wejściowe odpowiednio dla zakresu 2 i span-27), ale nie jest mapowany do pliku „data_1. plik” ani „data_123.plik”.
  • Gdy brakuje specyfikacji SPAN, zakłada się, że zawsze jest to wartość Span '0'.
  • Jeśli określono SPAN, potok przetworzy najnowszy zakres i zapisze numer zakresu w metadanych.

Załóżmy na przykład, że istnieją dane wejściowe:

  • '/tmp/span-1/pociąg/dane'
  • '/tmp/span-1/ewaluacja/dane'
  • '/tmp/span-2/pociąg/dane'
  • '/tmp/span-2/ewaluacja/dane'

a konfiguracja wejścia jest pokazana poniżej:

splits {
  name: 'train'
  pattern: 'span-{SPAN}/train/*'
}
splits {
  name: 'eval'
  pattern: 'span-{SPAN}/eval/*'
}

podczas uruchamiania potoku przetwarza:

  • '/tmp/span-2/pociąg/dane' jako podział pociągu
  • '/tmp/span-2/eval/data' jako podział eval

z numerem rozpiętości jako „2”. Jeśli później '/tmp/span-3/...' są gotowe, po prostu uruchom potok ponownie i pobierze on span '3' do przetworzenia. Poniżej przedstawiono przykładowy kod do używania specyfikacji span:

06a98ffff30

Odzyskanie określonego zakresu można wykonać za pomocą RangeConfig, który jest szczegółowo opisany poniżej.

Data

Jeśli źródło danych jest zorganizowane w systemie plików według daty, TFX obsługuje mapowanie dat bezpośrednio na numery zakresu. Istnieją trzy specyfikacje reprezentujące mapowanie od dat do rozpiętości: {RRRR}, {MM} i {DD}:

  • Te trzy specyfikacje powinny być całkowicie obecne we wzorcu glob wejściowym, jeśli jakakolwiek jest określona:
  • Można określić wyłącznie specyfikację {SPAN} lub ten zestaw specyfikacji dat.
  • Wyliczana jest data kalendarzowa z rokiem od RRRR, miesiąc od MM i dzień miesiąca od DD, a następnie liczba rozpiętości jest obliczana jako liczba dni od epoki Uniksa (tj. 1970-01-01). Na przykład „log-{RRRR}{MM}{DD}.data” pasuje do pliku „log-19700101.data” i wykorzystuje go jako dane wejściowe dla Span-0, a „log-20170101.data” jako dane wejściowe dla Span-17167.
  • Jeśli ten zestaw specyfikacji dat zostanie określony, potok przetworzy ostatnią datę i zapisze odpowiedni numer zakresu w metadanych.

Załóżmy na przykład, że istnieją dane wejściowe uporządkowane według dat kalendarzowych:

  • '/tmp/1970-01-02/pociąg/dane'
  • '/tmp/1970-01-02/ocena/dane'
  • '/tmp/1970-01-03/pociąg/dane'
  • '/tmp/1970-01-03/ocena/dane'

a konfiguracja wejścia jest pokazana poniżej:

splits {
  name: 'train'
  pattern: '{YYYY}-{MM}-{DD}/train/*'
}
splits {
  name: 'eval'
  pattern: '{YYYY}-{MM}-{DD}/eval/*'
}

podczas uruchamiania potoku przetwarza:

  • '/tmp/1970-01-03/pociąg/dane' jako podział pociągu
  • '/tmp/1970-01-03/eval/data' jako podział eval

z numerem rozpiętości jako „2”. Jeśli później '/tmp/1970-01-04/...' będą gotowe, po prostu uruchom potok ponownie, a przejmie on zakres '3' do przetworzenia. Poniżej przedstawiamy przykładowy kod do używania specyfikacji daty:

input = proto.Input(splits=[
                proto.Input.Split(name='train',
                                            pattern='{YYYY}-{MM}-{DD}/train/*'),
                proto.Input.Split(name='eval',
                                            pattern='{YYYY}-{MM}-{DD}/eval/*')
            ])
example_gen = CsvExampleGen(input_base='/tmp', input_config=input)

Wersja

Wersję można pobrać za pomocą specyfikacji „{VERSION}” we wzorcu wejściowym glob :

  • Ta specyfikacja dopasowuje cyfry i mapuje dane do odpowiednich numerów WERSJI w ramach SPAN. Należy pamiętać, że specyfikacja wersji może być używana w połączeniu ze specyfikacją zakresu lub daty.
  • Ta specyfikacja może być również opcjonalnie określona z szerokością w taki sam sposób, jak specyfikacja SPAN. np. 'span-{SPAN}/wersja-{VERSION:4}/data-*'.
  • Gdy brakuje specyfikacji VERSION, wersja jest ustawiona na Brak.
  • Jeśli określono zarówno SPAN, jak i VERSION, potok przetworzy najnowszą wersję dla najnowszego zakresu i zapisze numer wersji w metadanych.
  • Jeśli określono WERSJĘ, ale nie określono SPAN (lub daty), zostanie zgłoszony błąd.

Załóżmy na przykład, że istnieją dane wejściowe:

  • „/tmp/zakres-1/wer-1/pociąg/dane”
  • '/tmp/zakres-1/wer-1/ocena/dane'
  • '/tmp/span-2/wer-1/pociąg/dane'
  • '/tmp/zakres-2/wer-1/ocena/dane'
  • '/tmp/span-2/wer-2/pociąg/dane'
  • '/tmp/span-2/wer-2/ewaluacja/dane'

a konfiguracja wejścia jest pokazana poniżej:

splits {
  name: 'train'
  pattern: 'span-{SPAN}/ver-{VERSION}/train/*'
}
splits {
  name: 'eval'
  pattern: 'span-{SPAN}/ver-{VERSION}/eval/*'
}

podczas uruchamiania potoku przetwarza:

  • „/tmp/span-2/ver-2/train/data” jako podział pociągu
  • '/tmp/span-2/ver-2/eval/data' jako podział eval

z numerem zakresu jako '2' i numerem wersji jako '2'. Jeśli później '/tmp/span-2/ver-3/...' będą gotowe, po prostu uruchom potok ponownie i pobierze on span '2' i wersję '3' do przetworzenia. Poniżej przedstawiono przykład kodu do korzystania ze specyfikacji wersji:

input = proto.Input(splits=[
                proto.Input.Split(name='train',
                                            pattern='span-{SPAN}/ver-{VERSION}/train/*'),
                proto.Input.Split(name='eval',
                                            pattern='span-{SPAN}/ver-{VERSION}/eval/*')
            ])
example_gen = CsvExampleGen(input_base='/tmp', input_config=input)

Konfiguracja zakresu

TFX obsługuje pobieranie i przetwarzanie określonego zakresu w opartej na plikach pliku ExampleGen przy użyciu konfiguracji zakresu, abstrakcyjnej konfiguracji używanej do opisywania zakresów dla różnych jednostek TFX. Aby pobrać określony zakres, ustaw range_config dla komponentu ExampleGen opartego na plikach. Załóżmy na przykład, że istnieją dane wejściowe:

  • '/tmp/span-01/pociąg/dane'
  • '/tmp/span-01/ocena/dane'
  • '/tmp/span-02/pociąg/dane'
  • '/tmp/span-02/ocena/dane'

Aby konkretnie pobrać i przetworzyć dane z rozpiętością '1', określamy konfigurację zakresu oprócz konfiguracji wejściowej. Należy zauważyć, że ExampleGen obsługuje tylko jednozakresowe zakresy statyczne (aby określić przetwarzanie określonych poszczególnych zakresów). Dlatego dla StaticRange, start_span_number musi być równy end_span_number. Korzystając z dostarczonego zakresu i informacji o szerokości zakresu (jeśli są podane) dla wypełnienia zerowego, ExampleGen zastąpi specyfikację SPAN w dostarczonych wzorcach podziału żądanym numerem zakresu. Przykład użycia pokazano poniżej:

# In cases where files have zero-padding, the width modifier in SPAN spec is
# required so TFX can correctly substitute spec with zero-padded span number.
input = proto.Input(splits=[
                proto.Input.Split(name='train',
                                            pattern='span-{SPAN:2}/train/*'),
                proto.Input.Split(name='eval',
                                            pattern='span-{SPAN:2}/eval/*')
            ])
# Specify the span number to be processed here using StaticRange.
range = proto.RangeConfig(
                static_range=proto.StaticRange(
                        start_span_number=1, end_span_number=1)
            )

# After substitution, the train and eval split patterns will be
# 'input_dir/span-01/train/*' and 'input_dir/span-01/eval/*', respectively.
example_gen = CsvExampleGen(input_base=input_dir, input_config=input,
                            range_config=range)

Konfiguracji zakresu można również użyć do przetwarzania określonych dat, jeśli zamiast specyfikacji SPAN użyto specyfikacji daty. Załóżmy na przykład, że istnieją dane wejściowe uporządkowane według dat kalendarzowych:

  • '/tmp/1970-01-02/pociąg/dane'
  • '/tmp/1970-01-02/ocena/dane'
  • '/tmp/1970-01-03/pociąg/dane'
  • '/tmp/1970-01-03/ocena/dane'

Aby pobrać i przetworzyć dane w dniu 2 stycznia 1970 r., wykonujemy następujące czynności:

from  tfx.components.example_gen import utils

input = proto.Input(splits=[
                proto.Input.Split(name='train',
                                            pattern='{YYYY}-{MM}-{DD}/train/*'),
                proto.Input.Split(name='eval',
                                            pattern='{YYYY}-{MM}-{DD}/eval/*')
            ])
# Specify date to be converted to span number to be processed using StaticRange.
span = utils.date_to_span_number(1970, 1, 2)
range = proto.RangeConfig(
                static_range=range_config_pb2.StaticRange(
                        start_span_number=span, end_span_number=span)
            )

# After substitution, the train and eval split patterns will be
# 'input_dir/1970-01-02/train/*' and 'input_dir/1970-01-02/eval/*',
# respectively.
example_gen = CsvExampleGen(input_base=input_dir, input_config=input,
                            range_config=range)

Przykład niestandardowyGen

Jeśli obecnie dostępne komponenty ExampleGen nie odpowiadają Twoim potrzebom, możesz utworzyć niestandardowy ExampleGen, który umożliwi odczytywanie z różnych źródeł danych lub w różnych formatach danych.

Dostosowywanie na podstawie pliku PrzykładGen (eksperymentalne)

Najpierw rozszerz BaseExampleGenExecutor o niestandardową PTransform Beam, która zapewnia konwersję z danych wejściowych pociągu/oceny podzielonych na przykłady TF. Na przykład executor CsvExampleGen zapewnia konwersję z wejściowego podzielonego pliku CSV na przykłady TF.

Następnie utwórz komponent z powyższym executorem, tak jak to zrobiono w komponencie CsvExampleGen . Alternatywnie przekaż niestandardowy executor do standardowego komponentu ExampleGen, jak pokazano poniżej.

from tfx.components.base import executor_spec
from tfx.components.example_gen.csv_example_gen import executor

example_gen = FileBasedExampleGen(
    input_base=os.path.join(base_dir, 'data/simple'),
    custom_executor_spec=executor_spec.ExecutorClassSpec(executor.Executor))

Teraz obsługujemy również odczytywanie plików Avro i Parquet za pomocą tej metody .

Dodatkowe formaty danych

Apache Beam obsługuje odczytywanie wielu dodatkowych formatów danych . poprzez transformacje We/Wy wiązki. Możesz tworzyć niestandardowe komponenty ExampleGen, wykorzystując transformacje We/Wy wiązki przy użyciu wzorca podobnego do przykładu Avro

  return (pipeline
          | 'ReadFromAvro' >> beam.io.ReadFromAvro(avro_pattern)
          | 'ToTFExample' >> beam.Map(utils.dict_to_example))

W chwili pisania tego tekstu obecnie obsługiwane formaty i źródła danych dla pakietu Beam Python SDK obejmują:

  • Amazonka S3
  • Apache Avro
  • Apache Hadoop
  • Apache Kafka
  • Parkiet Apache
  • Google Cloud BigQuery
  • Google Cloud BigTable
  • Google Cloud Datastore
  • Pub/Sub w chmurze Google
  • Google Cloud Storage (GCS)
  • MongoDB

Najnowszą listę znajdziesz w dokumentacji Beam .

Dostosowywanie w oparciu o zapytania (eksperymentalne)

Najpierw rozszerz BaseExampleGenExecutor o niestandardową funkcję Beam PTransform, która odczytuje dane z zewnętrznego źródła danych. Następnie utwórz prosty składnik, rozszerzając QueryBasedExampleGen.

Może to wymagać dodatkowych konfiguracji połączenia lub nie. Na przykład executor BigQuery odczytuje przy użyciu domyślnego łącznika beam.io, który abstrahuje szczegóły konfiguracji połączenia. Executor Presto wymaga niestandardowego PTransform Beam i niestandardowego protokołu konfiguracji połączenia jako danych wejściowych.

Jeśli konfiguracja połączenia jest wymagana dla niestandardowego komponentu ExampleGen, utwórz nowy protobuf i przekaż go przez custom_config, który jest teraz opcjonalnym parametrem wykonywania. Poniżej znajduje się przykład użycia skonfigurowanego komponentu.

from tfx.examples.custom_components.presto_example_gen.proto import presto_config_pb2
from tfx.examples.custom_components.presto_example_gen.presto_component.component import PrestoExampleGen

presto_config = presto_config_pb2.PrestoConnConfig(host='localhost', port=8080)
example_gen = PrestoExampleGen(presto_config, query='SELECT * FROM chicago_taxi_trips')

Przykładowe komponenty niższego rzędu

Niestandardowa konfiguracja podziału jest obsługiwana dla dalszych komponentów.

StatystykiGen

Domyślnym zachowaniem jest generowanie statystyk dla wszystkich podziałów.

Aby wykluczyć jakiekolwiek podziały, ustaw składnik exclude_splits dla komponentu StatisticsGen. Na przykład:

# Exclude the 'eval' split.
statistics_gen = StatisticsGen(
             examples=example_gen.outputs['examples'],
             exclude_splits=['eval'])

SchematGen

Domyślnym zachowaniem jest generowanie schematu na podstawie wszystkich podziałów.

Aby wykluczyć jakiekolwiek podziały, ustaw właściwość exclude_splits dla składnika SchemaGen. Na przykład:

# Exclude the 'eval' split.
schema_gen = SchemaGen(
             statistics=statistics_gen.outputs['statistics'],
             exclude_splits=['eval'])

Przykład Validator

Domyślnym zachowaniem jest sprawdzanie poprawności statystyk wszystkich podziałów na przykładach wejściowych względem schematu.

Aby wykluczyć jakiekolwiek podziały, ustaw exclude_splits dla komponentu ExampleValidator. Na przykład:

# Exclude the 'eval' split.
example_validator = ExampleValidator(
             statistics=statistics_gen.outputs['statistics'],
             schema=schema_gen.outputs['schema'],
             exclude_splits=['eval'])

Przekształcać

Domyślne zachowanie polega na analizie i tworzeniu metadanych z podziału „pociąg” i przekształceniu wszystkich podziałów.

Aby określić podziały analizy i podziały transformacji, ustaw splits_config dla składnika Transform. Na przykład:

# Analyze the 'train' split and transform all splits.
transform = Transform(
      examples=example_gen.outputs['examples'],
      schema=schema_gen.outputs['schema'],
      module_file=_taxi_module_file,
      splits_config=proto.SplitsConfig(analyze=['train'],
                                               transform=['train', 'eval']))

Trener i tuner

Domyślnym zachowaniem jest trenowanie w podziale „pociąg” i ocenianie w podziale „eval”.

Aby określić podziały pociągu i ocenić podziały, ustaw train_args i eval_args dla komponentu Trainer. Na przykład:

# Train on the 'train' split and evaluate on the 'eval' split.
Trainer = Trainer(
      module_file=_taxi_module_file,
      examples=transform.outputs['transformed_examples'],
      schema=schema_gen.outputs['schema'],
      transform_graph=transform.outputs['transform_graph'],
      train_args=proto.TrainArgs(splits=['train'], num_steps=10000),
      eval_args=proto.EvalArgs(splits=['eval'], num_steps=5000))

Ewaluator

Domyślnym zachowaniem jest dostarczanie metryk obliczonych na podstawie podziału „ocena”.

Aby obliczyć statystyki oceny dotyczące podziałów niestandardowych, ustaw example_splits dla składnika Evaluator. Na przykład:

# Compute metrics on the 'eval1' split and the 'eval2' split.
Trainer = Evaluator(
      examples=example_gen.outputs['examples'],
      model=trainer.outputs['model'],
      example_splits=['eval1', 'eval2'])

Więcej szczegółów można znaleźć w dokumentacji API CsvExampleGen , FileBasedExampleGen API i ImportExampleGen API .