Unisciti alla comunità SIG TFX-Addons e contribuisci a rendere TFX ancora migliore!

Il componente Pipeline TFX di ExampleGen

Il componente ExampleGen TFX Pipeline inserisce i dati nelle pipeline TFX. Utilizza file/servizi esterni per generare esempi che verranno letti da altri componenti TFX. Fornisce inoltre una partizione coerente e configurabile e mescola il set di dati per le migliori pratiche di ML.

  • Consuma: dati da origini dati esterne come CSV, TFRecord , Avro, Parquet e BigQuery.
  • Emette: record tf.Example , record tf.SequenceExample o formato proto, a seconda del formato del payload.

EsempioGen e altri componenti

ExampleGen fornisce dati ai componenti che utilizzano la libreria TensorFlow Data Validation , come SchemaGen , StatisticsGen e Example Validator . Fornisce inoltre dati a Transform , che utilizza la libreria TensorFlow Transform e, infine, agli obiettivi di distribuzione durante l'inferenza.

Fonti e formati di dati

Attualmente un'installazione standard di TFX include componenti ExampleGen completi per queste origini dati e formati:

Sono inoltre disponibili executor personalizzati che consentono lo sviluppo di componenti ExampleGen per queste origini dati e formati:

Vedere gli esempi di utilizzo nel codice sorgente e questa discussione per ulteriori informazioni su come utilizzare e sviluppare esecutori personalizzati.

Inoltre, queste origini dati e formati sono disponibili come esempi di componenti personalizzati :

Acquisizione di formati di dati supportati da Apache Beam

Apache Beam supporta l'acquisizione di dati da un'ampia gamma di origini dati e formati ( vedi sotto ). Queste funzionalità possono essere utilizzate per creare componenti ExampleGen personalizzati per TFX, come dimostrato da alcuni componenti ExampleGen esistenti ( vedi sotto ).

Come usare un componente ExampleGen

Per le origini dati supportate (attualmente, file CSV, file TFRecord con formato tf.Example , tf.SequenceExample e proto e risultati delle query BigQuery) il componente pipeline di ExampleGen può essere utilizzato direttamente nella distribuzione e richiede poca personalizzazione. Per esempio:

example_gen = CsvExampleGen(input_base='data_root')

o come sotto per importare TFRecord esterno con tf.Example direttamente:

example_gen = ImportExampleGen(input_base=path_to_tfrecord_dir)

Span, versione e split

Uno Span è un raggruppamento di esempi di formazione. Se i tuoi dati sono persistenti su un filesystem, ogni Span può essere archiviato in una directory separata. La semantica di uno Span non è codificata in TFX; uno Span può corrispondere a un giorno di dati, un'ora di dati o qualsiasi altro raggruppamento significativo per l'attività.

Ogni span può contenere più versioni di dati. Per fare un esempio, se rimuovi alcuni esempi da uno Span per ripulire i dati di scarsa qualità, questo potrebbe comportare una nuova versione di quello Span. Per impostazione predefinita, i componenti TFX operano sull'ultima versione all'interno di un intervallo.

Ciascuna versione all'interno di un intervallo può essere ulteriormente suddivisa in più suddivisioni. Il caso d'uso più comune per suddividere un intervallo è suddividerlo in dati di addestramento e di valutazione.

Campate e divisioni

Divisione ingresso/uscita personalizzataoutput

Per personalizzare il rapporto di suddivisione train/eval che verrà emesso da ExampleGen, imposta output_config per il componente ExampleGen. Per esempio:

# Input has a single split 'input_dir/*'.
# Output 2 splits: train:eval=3:1.
output = proto.Output(
             split_config=example_gen_pb2.SplitConfig(splits=[
                 proto.SplitConfig.Split(name='train', hash_buckets=3),
                 proto.SplitConfig.Split(name='eval', hash_buckets=1)
             ]))
example_gen = CsvExampleGen(input_base=input_dir, output_config=output)

Nota come hash_buckets stati impostati hash_buckets in questo esempio.

Per una sorgente di input che è già stata divisa, imposta input_config per il componente ExampleGen:


# Input train split is 'input_dir/train/*', eval split is 'input_dir/eval/*'.
# Output splits are generated one-to-one mapping from input splits.
input = proto.Input(splits=[
                example_gen_pb2.Input.Split(name='train', pattern='train/*'),
                example_gen_pb2.Input.Split(name='eval', pattern='eval/*')
            ])
example_gen = CsvExampleGen(input_base=input_dir, input_config=input)

Per la generazione di esempio basata su file (ad es. CsvExampleGen e ImportExampleGen), pattern è un modello di file relativo al glob che mappa i file di input con la directory radice fornita dal percorso di base di input. Per la generazione di esempi basata su query (ad es. BigQueryExampleGen, PrestoExampleGen), pattern è una query SQL.

Per impostazione predefinita, l'intera directory di base di input viene trattata come una singola suddivisione di input e la suddivisione di output train e eval viene generata con un rapporto 2:1.

Fare riferimento a proto/example_gen.proto per la configurazione di input e output split di ExampleGen. E fare riferimento alla guida ai componenti a valle per l'utilizzo delle suddivisioni personalizzate a valle.

Metodo di divisione

Quando si utilizza il metodo di suddivisione hash_buckets , invece dell'intero record, è possibile utilizzare una funzionalità per il partizionamento degli esempi. Se è presente una funzionalità, ExampleGen utilizzerà un'impronta digitale di tale funzionalità come chiave di partizione.

Questa funzione può essere utilizzata per mantenere una suddivisione stabile rispetto a determinate proprietà degli esempi: ad esempio, un utente verrà sempre inserito nella stessa suddivisione se "user_id" è stato selezionato come nome della funzione di partizione.

L'interpretazione di cosa significa una "funzione" e come abbinare una "funzione" con il nome specificato dipende dall'implementazione di ExampleGen e dal tipo degli esempi.

Per implementazioni di ExampleGen già pronte:

  • Se genera tf.Example, allora una "feature" indica una voce in tf.Example.features.feature.
  • Se genera tf.SequenceExample, allora una "feature" indica una voce in tf.SequenceExample.context.feature.
  • Sono supportate solo le funzionalità int64 e byte.

Nei seguenti casi, ExampleGen genera errori di runtime:

Per generare la suddivisione train/eval in base a una funzionalità negli esempi, imposta output_config per il componente ExampleGen. Per esempio:

# Input has a single split 'input_dir/*'.
# Output 2 splits based on 'user_id' features: train:eval=3:1.
output = proto.Output(
             split_config=proto.SplitConfig(splits=[
                 proto.SplitConfig.Split(name='train', hash_buckets=3),
                 proto.SplitConfig.Split(name='eval', hash_buckets=1)
             ],
             partition_feature_name='user_id'))
example_gen = CsvExampleGen(input_base=input_dir, output_config=output)

Notare come è stato impostato partition_feature_name in questo esempio.

span

Lo span può essere recuperato utilizzando la specifica '{SPAN}' nel pattern glob di input :

  • Questa specifica abbina le cifre e mappa i dati nei relativi numeri SPAN. Ad esempio, 'data_{SPAN}-*.tfrecord' raccoglierà file come 'data_12-a.tfrecord', 'date_12-b.tfrecord'.
  • Facoltativamente, questa specifica può essere specificata con la larghezza degli interi quando mappati. Ad esempio, 'data_{SPAN:2}.file' si associa a file come 'data_02.file' e 'data_27.file' (come input rispettivamente per Span-2 e Span-27), ma non viene mappato su 'data_1. file' né 'data_123.file'.
  • Quando la specifica SPAN è mancante, si presume che sia sempre Span '0'.
  • Se viene specificato SPAN, la pipeline elaborerà l'intervallo più recente e memorizzerà il numero di intervallo nei metadati.

Ad esempio, supponiamo che ci siano dati di input:

  • '/tmp/span-1/train/data'
  • '/tmp/span-1/eval/data'
  • '/tmp/span-2/train/data'
  • '/tmp/span-2/eval/data'

e la configurazione di input è mostrata come di seguito:

splits {
  name: 'train'
  pattern: 'span-{SPAN}/train/*'
}
splits {
  name: 'eval'
  pattern: 'span-{SPAN}/eval/*'
}

quando si attiva la pipeline, elaborerà:

  • '/tmp/span-2/train/data' come suddivisione del treno
  • '/tmp/span-2/eval/data' come eval split

con numero di span come '2'. Se in seguito '/tmp/span-3/...' sono pronti, è sufficiente attivare nuovamente la pipeline e prenderà l'intervallo '3' per l'elaborazione. Di seguito viene mostrato l'esempio di codice per l'utilizzo delle specifiche span:

input = proto.Input(splits=[
                proto.Input.Split(name='train',
                                            pattern='span-{SPAN}/train/*'),
                proto.Input.Split(name='eval',
                                            pattern='span-{SPAN}/eval/*')
            ])
example_gen = CsvExampleGen(input_base='/tmp', input_config=input)

Il recupero di un determinato intervallo può essere eseguito con RangeConfig, descritto in dettaglio di seguito.

Data

Se la tua origine dati è organizzata su filesystem per data, TFX supporta la mappatura delle date direttamente su numeri intermedi. Esistono tre specifiche per rappresentare la mappatura dalle date agli intervalli: {AAAA}, {MM} e {GG}:

  • Le tre specifiche dovrebbero essere del tutto presenti nel modello glob di input, se specificato:
  • È possibile specificare esclusivamente la specifica {SPAN} o questo insieme di specifiche di data.
  • Viene calcolata una data di calendario con l'anno da AAAA, il mese da MM e il giorno del mese da GG, quindi il numero dell'intervallo viene calcolato come numero di giorni dall'epoca unix (es. 1970-01-01). Ad esempio, 'log-{YYYY}{MM}{DD}.data' corrisponde a un file 'log-19700101.data' e lo utilizza come input per Span-0 e 'log-20170101.data' come input per Span-17167.
  • Se viene specificato questo set di specifiche di data, la pipeline elaborerà l'ultima data più recente e memorizzerà il numero di intervallo corrispondente nei metadati.

Ad esempio, supponiamo che ci siano dati di input organizzati per data di calendario:

  • '/tmp/1970-01-02/treno/dati'
  • '/tmp/1970-01-02/eval/data'
  • '/tmp/1970-01-03/treno/dati'
  • '/tmp/1970-01-03/eval/data'

e la configurazione di input è mostrata come di seguito:

splits {
  name: 'train'
  pattern: '{YYYY}-{MM}-{DD}/train/*'
}
splits {
  name: 'eval'
  pattern: '{YYYY}-{MM}-{DD}/eval/*'
}

quando si attiva la pipeline, elaborerà:

  • '/tmp/1970-01-03/train/data' come suddivisione del treno
  • '/tmp/1970-01-03/eval/data' come eval split

con numero di span come '2'. Se in seguito '/tmp/1970-01-04/...' sono pronti, è sufficiente attivare di nuovo la pipeline e prenderà l'intervallo '3' per l'elaborazione. Di seguito viene mostrato l'esempio di codice per l'utilizzo delle specifiche della data:

input = proto.Input(splits=[
                proto.Input.Split(name='train',
                                            pattern='{YYYY}-{MM}-{DD}/train/*'),
                proto.Input.Split(name='eval',
                                            pattern='{YYYY}-{MM}-{DD}/eval/*')
            ])
example_gen = CsvExampleGen(input_base='/tmp', input_config=input)

Versione

La versione può essere recuperata utilizzando la specifica '{VERSION}' nel pattern glob di input :

  • Questa specifica abbina le cifre e mappa i dati ai relativi numeri di VERSIONE sotto SPAN. Si noti che la specifica della versione può essere utilizzata in combinazione con la specifica dell'intervallo o della data.
  • Questa specifica può anche essere facoltativamente specificata con la larghezza allo stesso modo della specifica SPAN. ad esempio 'span-{SPAN}/version-{VERSION:4}/data-*'.
  • Quando la specifica VERSION è mancante, la versione è impostata su Nessuna.
  • Se SPAN e VERSION sono entrambi specificati, la pipeline elaborerà la versione più recente per l'intervallo più recente e memorizzerà il numero di versione nei metadati.
  • Se viene specificato VERSION, ma non SPAN (o data specifica), verrà generato un errore.

Ad esempio, supponiamo che ci siano dati di input:

  • '/tmp/span-1/ver-1/train/data'
  • '/tmp/span-1/ver-1/eval/data'
  • '/tmp/span-2/ver-1/train/data'
  • '/tmp/span-2/ver-1/eval/data'
  • '/tmp/span-2/ver-2/train/data'
  • '/tmp/span-2/ver-2/eval/data'

e la configurazione di input è mostrata come di seguito:

splits {
  name: 'train'
  pattern: 'span-{SPAN}/ver-{VERSION}/train/*'
}
splits {
  name: 'eval'
  pattern: 'span-{SPAN}/ver-{VERSION}/eval/*'
}

quando si attiva la pipeline, elaborerà:

  • '/tmp/span-2/ver-2/train/data' come suddivisione del treno
  • '/tmp/span-2/ver-2/eval/data' come eval split

con numero di span come '2' e numero di versione come '2'. Se in seguito '/tmp/span-2/ver-3/...' sono pronti, è sufficiente attivare nuovamente la pipeline e prenderà lo span '2' e la versione '3' per l'elaborazione. Di seguito viene mostrato l'esempio di codice per l'utilizzo delle specifiche della versione:

input = proto.Input(splits=[
                proto.Input.Split(name='train',
                                            pattern='span-{SPAN}/ver-{VERSION}/train/*'),
                proto.Input.Split(name='eval',
                                            pattern='span-{SPAN}/ver-{VERSION}/eval/*')
            ])
example_gen = CsvExampleGen(input_base='/tmp', input_config=input)

Configurazione gamma

TFX supporta il recupero e l'elaborazione di un intervallo specifico in ExampleGen basato su file utilizzando range config, una configurazione astratta utilizzata per descrivere intervalli per diverse entità TFX. Per recuperare un intervallo specifico, imposta range_config per un componente ExampleGen basato su file. Ad esempio, supponiamo che ci siano dati di input:

  • '/tmp/span-01/train/data'
  • '/tmp/span-01/eval/data'
  • '/tmp/span-02/train/data'
  • '/tmp/span-02/eval/data'

Per recuperare ed elaborare specificamente i dati con span '1', specifichiamo una configurazione di intervallo oltre alla configurazione di input. Tieni presente che ExampleGen supporta solo intervalli statici a ampiezza singola (per specificare l'elaborazione di intervalli individuali specifici). Pertanto, per StaticRange, start_span_number deve essere uguale a end_span_number. Utilizzando l'intervallo fornito e le informazioni sulla larghezza dell'intervallo (se fornite) per il riempimento zero, ExampleGen sostituirà la specifica SPAN nei modelli di suddivisione forniti con il numero di estensione desiderato. Di seguito è riportato un esempio di utilizzo:

# In cases where files have zero-padding, the width modifier in SPAN spec is
# required so TFX can correctly substitute spec with zero-padded span number.
input = proto.Input(splits=[
                proto.Input.Split(name='train',
                                            pattern='span-{SPAN:2}/train/*'),
                proto.Input.Split(name='eval',
                                            pattern='span-{SPAN:2}/eval/*')
            ])
# Specify the span number to be processed here using StaticRange.
range = proto.RangeConfig(
                static_range=proto.StaticRange(
                        start_span_number=1, end_span_number=1)
            )

# After substitution, the train and eval split patterns will be
# 'input_dir/span-01/train/*' and 'input_dir/span-01/eval/*', respectively.
example_gen = CsvExampleGen(input_base=input_dir, input_config=input,
                            range_config=range)

La configurazione dell'intervallo può essere utilizzata anche per elaborare date specifiche, se viene utilizzata la specifica della data anziché la specifica SPAN. Ad esempio, supponiamo che ci siano dati di input organizzati per data di calendario:

  • '/tmp/1970-01-02/treno/dati'
  • '/tmp/1970-01-02/eval/data'
  • '/tmp/1970-01-03/treno/dati'
  • '/tmp/1970-01-03/eval/data'

Per recuperare ed elaborare specificamente i dati il ​​2 gennaio 1970, facciamo quanto segue:

from  tfx.components.example_gen import utils

input = proto.Input(splits=[
                proto.Input.Split(name='train',
                                            pattern='{YYYY}-{MM}-{DD}/train/*'),
                proto.Input.Split(name='eval',
                                            pattern='{YYYY}-{MM}-{DD}/eval/*')
            ])
# Specify date to be converted to span number to be processed using StaticRange.
span = utils.date_to_span_number(1970, 1, 2)
range = proto.RangeConfig(
                static_range=range_config_pb2.StaticRange(
                        start_span_number=span, end_span_number=span)
            )

# After substitution, the train and eval split patterns will be
# 'input_dir/1970-01-02/train/*' and 'input_dir/1970-01-02/eval/*',
# respectively.
example_gen = CsvExampleGen(input_base=input_dir, input_config=input,
                            range_config=range)

Esempio personalizzato Gen

Se i componenti ExampleGen attualmente disponibili non soddisfano le tue esigenze, puoi creare un ExampleGen personalizzato, che ti consentirà di leggere da diverse origini dati o in diversi formati di dati.

Esempio basato su file Personalizzazione Gen (sperimentale)

Innanzitutto, estendi BaseExampleGenExecutor con un Beam PTransform personalizzato, che fornisce la conversione dalla suddivisione dell'input treno/valutazione agli esempi TF. Ad esempio, l' esecutore CsvExampleGen fornisce la conversione da una suddivisione CSV di input in esempi TF.

Quindi, crea un componente con l'executor sopra, come fatto nel componente CsvExampleGen . In alternativa, passa un executor personalizzato nel componente standard ExampleGen come mostrato di seguito.

from tfx.components.base import executor_spec
from tfx.components.example_gen.csv_example_gen import executor

example_gen = FileBasedExampleGen(
    input_base=os.path.join(base_dir, 'data/simple'),
    custom_executor_spec=executor_spec.ExecutorClassSpec(executor.Executor))

Ora, supportiamo anche la lettura di file Avro e Parquet utilizzando questo metodo .

Formati di dati aggiuntivi

Apache Beam supporta la lettura di numerosi formati di dati aggiuntivi . tramite trasformazioni di I/O del fascio. È possibile creare componenti di ExampleGen personalizzati sfruttando le trasformazioni di I/O Beam utilizzando un modello simile all'esempio di Avro

  return (pipeline
          | 'ReadFromAvro' >> beam.io.ReadFromAvro(avro_pattern)
          | 'ToTFExample' >> beam.Map(utils.dict_to_example))

Al momento della stesura di questo documento, i formati e le origini dati attualmente supportati per Beam Python SDK includono:

  • Amazon S3
  • Apache Avro
  • Apache Hadoop
  • Apache Kafka
  • Apache Parquet
  • Google Cloud BigQuery
  • Google Cloud BigTable
  • Google Cloud Datastore
  • Google Cloud Pub/Sub
  • Google Cloud Storage (GCS)
  • MongoDB

Controlla i documenti di Beam per l'elenco più recente.

Esempio di personalizzazione Gen basata su query (sperimentale)

Innanzitutto, estendi BaseExampleGenExecutor con un Beam PTransform personalizzato, che legge dall'origine dati esterna. Quindi, crea un componente semplice estendendo QueryBasedExampleGen.

Ciò potrebbe richiedere o meno configurazioni di connessione aggiuntive. Ad esempio, l' executor BigQuery legge utilizzando un connettore beam.io predefinito, che astrae i dettagli di configurazione della connessione. L' executor Presto richiede un Beam PTransform personalizzato e un protocollo di configurazione della connessione personalizzato come input.

Se è richiesta una configurazione di connessione per un componente ExampleGen personalizzato, crea un nuovo protobuf e passalo tramite custom_config, che ora è un parametro di esecuzione facoltativo. Di seguito è riportato un esempio di come utilizzare un componente configurato.

from tfx.examples.custom_components.presto_example_gen.proto import presto_config_pb2
from tfx.examples.custom_components.presto_example_gen.presto_component.component import PrestoExampleGen

presto_config = presto_config_pb2.PrestoConnConfig(host='localhost', port=8080)
example_gen = PrestoExampleGen(presto_config, query='SELECT * FROM chicago_taxi_trips')

Esempio di componenti di generazione a valle

La configurazione divisa personalizzata è supportata per i componenti a valle.

StatisticheGen

Il comportamento predefinito consiste nell'eseguire la generazione di statistiche per tutte le divisioni.

Per escludere qualsiasi suddivisione, impostare il componente exclude_splits per StatisticsGen. Per esempio:

# Exclude the 'eval' split.
statistics_gen = StatisticsGen(
             examples=example_gen.outputs['examples'],
             exclude_splits=['eval'])

SchemaGen

Il comportamento predefinito consiste nel generare uno schema basato su tutte le suddivisioni.

Per escludere qualsiasi suddivisione, impostare il componente exclude_splits per SchemaGen. Per esempio:

# Exclude the 'eval' split.
schema_gen = SchemaGen(
             statistics=statistics_gen.outputs['statistics'],
             exclude_splits=['eval'])

Esempio Validator

Il comportamento predefinito consiste nel convalidare le statistiche di tutte le suddivisioni sugli esempi di input rispetto a uno schema.

Per escludere qualsiasi suddivisione, imposta il componente exclude_splits per ExampleValidator. Per esempio:

# Exclude the 'eval' split.
example_validator = ExampleValidator(
             statistics=statistics_gen.outputs['statistics'],
             schema=schema_gen.outputs['schema'],
             exclude_splits=['eval'])

Trasformare

Il comportamento predefinito è analizzare e produrre i metadati dalla suddivisione 'train' e trasformare tutte le suddivisioni.

Per specificare le suddivisioni di analisi e trasformazione, impostare splits_config per il componente Trasforma. Per esempio:

# Analyze the 'train' split and transform all splits.
transform = Transform(
      examples=example_gen.outputs['examples'],
      schema=schema_gen.outputs['schema'],
      module_file=_taxi_module_file,
      splits_config=proto.SplitsConfig(analyze=['train'],
                                               transform=['train', 'eval']))

Allenatore e sintonizzatore

Il comportamento predefinito è addestrare sulla divisione 'train' e valutare sulla divisione 'valutazione'.

Per specificare le suddivisioni del treno e valutare le suddivisioni, impostare train_args ed eval_args per il componente Trainer. Per esempio:

# Train on the 'train' split and evaluate on the 'eval' split.
Trainer = Trainer(
      module_file=_taxi_module_file,
      examples=transform.outputs['transformed_examples'],
      schema=schema_gen.outputs['schema'],
      transform_graph=transform.outputs['transform_graph'],
      train_args=proto.TrainArgs(splits=['train'], num_steps=10000),
      eval_args=proto.EvalArgs(splits=['eval'], num_steps=5000))

valutatore

Il comportamento predefinito è fornire metriche calcolate sulla divisione "valutazione".

Per calcolare una statistica di valutazione sulle suddivisioni personalizzate, impostare il componente example_splits per Evaluator. Per esempio:

# Compute metrics on the 'eval1' split and the 'eval2' split.
Trainer = Evaluator(
      examples=example_gen.outputs['examples'],
      model=trainer.outputs['model'],
      example_splits=['eval1', 'eval2'])

Maggiori dettagli sono disponibili nel riferimento API CsvExampleGen , riferimento API FileBasedExampleGen e riferimento API ImportExampleGen .