Il componente della pipeline ExampleGen TFX

Mantieni tutto organizzato con le raccolte Salva e classifica i contenuti in base alle tue preferenze.

Il componente ExampleGen TFX Pipeline acquisisce i dati nelle pipeline TFX. Consuma file/servizi esterni per generare esempi che verranno letti da altri componenti TFX. Fornisce inoltre una partizione coerente e configurabile e mescola il set di dati per le migliori pratiche di ML.

  • Consuma: dati da origini dati esterne come CSV, TFRecord , Avro, Parquet e BigQuery.
  • Emette: record tf.Example , record tf.SequenceExample o formato proto, a seconda del formato del payload.

EsempioGen e altri componenti

ExampleGen fornisce dati ai componenti che utilizzano la libreria TensorFlow Data Validation , come SchemaGen , StatisticsGen e Example Validator . Fornisce inoltre dati a Transform , che utilizza la libreria TensorFlow Transform e, in definitiva, agli obiettivi di distribuzione durante l'inferenza.

Fonti di dati e formati

Attualmente un'installazione standard di TFX include componenti ExampleGen completi per queste origini dati e formati:

Sono inoltre disponibili esecutori personalizzati che consentono lo sviluppo di componenti ExampleGen per queste origini dati e formati:

Vedere gli esempi di utilizzo nel codice sorgente e questa discussione per ulteriori informazioni su come utilizzare e sviluppare esecutori personalizzati.

Inoltre, queste origini dati e formati sono disponibili come esempi di componenti personalizzati :

Ingestione di formati di dati supportati da Apache Beam

Apache Beam supporta l'acquisizione di dati da un'ampia gamma di origini dati e formati ( vedi sotto ). Queste funzionalità possono essere utilizzate per creare componenti ExampleGen personalizzati per TFX, come dimostrato da alcuni componenti ExampleGen esistenti ( vedi sotto ).

Come utilizzare un componente ExampleGen

Per le origini dati supportate (attualmente file CSV, file TFRecord con tf.Example , tf.SequenceExample e formato proto e risultati di query BigQuery) il componente della pipeline ExampleGen può essere utilizzato direttamente nella distribuzione e richiede poca personalizzazione. Per esempio:

example_gen = CsvExampleGen(input_base='data_root')

o come di seguito per importare direttamente TFRecord esterno con tf.Example :

example_gen = ImportExampleGen(input_base=path_to_tfrecord_dir)

Intervallo, Versione e Dividi

Uno Span è un raggruppamento di esempi di formazione. Se i tuoi dati sono persistenti su un filesystem, ogni Span può essere archiviato in una directory separata. La semantica di uno Span non è codificata in TFX; uno Span può corrispondere a un giorno di dati, un'ora di dati o qualsiasi altro raggruppamento significativo per l'attività.

Ciascun intervallo può contenere più versioni di dati. Per fare un esempio, se si rimuovono alcuni esempi da uno Span per ripulire dati di scarsa qualità, ciò potrebbe comportare una nuova versione di tale Span. Per impostazione predefinita, i componenti TFX funzionano sulla versione più recente all'interno di un intervallo.

Ciascuna versione all'interno di un intervallo può essere ulteriormente suddivisa in più divisioni. Il caso d'uso più comune per dividere uno Span è dividerlo in dati di training e di valutazione.

Intervalli e divisioni

Divisione ingresso/uscita personalizzata

Per personalizzare il rapporto di divisione treno/valutazione che verrà prodotto da ExampleGen, impostare output_config per il componente ExampleGen. Per esempio:

# Input has a single split 'input_dir/*'.
# Output 2 splits: train:eval=3:1.
output = proto.Output(
             split_config=example_gen_pb2.SplitConfig(splits=[
                 proto.SplitConfig.Split(name='train', hash_buckets=3),
                 proto.SplitConfig.Split(name='eval', hash_buckets=1)
             ]))
example_gen = CsvExampleGen(input_base=input_dir, output_config=output)

Nota come gli hash_buckets sono stati impostati in questo esempio.

Per una sorgente di input che è già stata divisa, imposta input_config per il componente ExampleGen:


# Input train split is 'input_dir/train/*', eval split is 'input_dir/eval/*'.
# Output splits are generated one-to-one mapping from input splits.
input = proto.Input(splits=[
                example_gen_pb2.Input.Split(name='train', pattern='train/*'),
                example_gen_pb2.Input.Split(name='eval', pattern='eval/*')
            ])
example_gen = CsvExampleGen(input_base=input_dir, input_config=input)

Per esempio basato su file gen (ad es. CsvExampleGen e ImportExampleGen), pattern è un pattern di file relativo glob che esegue il mapping ai file di input con la directory radice data dal percorso di base di input. Per la generazione di esempio basata su query (ad es. BigQueryExampleGen, PrestoExampleGen), pattern è una query SQL.

Per impostazione predefinita, l'intera directory di base dell'input viene trattata come una singola suddivisione dell'input e la suddivisione del treno e dell'uscita di valutazione viene generata con un rapporto 2:1.

Fare riferimento a proto/example_gen.proto per la configurazione della divisione di input e output di ExampleGen. E fare riferimento alla guida ai componenti a valle per l'utilizzo delle suddivisioni personalizzate a valle.

Metodo di scissione

Quando si utilizza il metodo di divisione hash_buckets , invece dell'intero record, è possibile utilizzare una funzione per partizionare gli esempi. Se è presente una funzionalità, ExampleGen utilizzerà un'impronta digitale di tale funzionalità come chiave di partizione.

Questa funzione può essere utilizzata per mantenere una divisione stabile rispetto a determinate proprietà degli esempi: ad esempio, un utente verrà sempre inserito nella stessa divisione se "id_utente" è stato selezionato come nome della funzione di partizione.

L'interpretazione di cosa significa una "caratteristica" e come abbinare una "caratteristica" con il nome specificato dipende dall'implementazione di ExampleGen e dal tipo di esempi.

Per le implementazioni di ExampleGen già pronte:

  • Se genera tf.Example, una "feature" indica una voce in tf.Example.features.feature.
  • Se genera tf.SequenceExample, una "feature" indica una voce in tf.SequenceExample.context.feature.
  • Sono supportate solo le funzionalità int64 e bytes.

Nei seguenti casi, ExampleGen genera errori di runtime:

  • Il nome della funzione specificata non esiste nell'esempio.
  • Funzione vuota: tf.train.Feature() .
  • Tipi di funzionalità non supportati, ad esempio funzionalità float.

Per generare la suddivisione treno/valutazione in base a una funzionalità negli esempi, impostare output_config per il componente ExampleGen. Per esempio:

# Input has a single split 'input_dir/*'.
# Output 2 splits based on 'user_id' features: train:eval=3:1.
output = proto.Output(
             split_config=proto.SplitConfig(splits=[
                 proto.SplitConfig.Split(name='train', hash_buckets=3),
                 proto.SplitConfig.Split(name='eval', hash_buckets=1)
             ],
             partition_feature_name='user_id'))
example_gen = CsvExampleGen(input_base=input_dir, output_config=output)

Nota come in questo esempio è stato impostato partition_feature_name .

intervallo

Lo span può essere recuperato utilizzando la specifica '{SPAN}' nel pattern glob di input :

  • Questa specifica corrisponde alle cifre e mappa i dati nei numeri SPAN pertinenti. Ad esempio, 'data_{SPAN}-*.tfrecord' raccoglierà file come 'data_12-a.tfrecord', 'date_12-b.tfrecord'.
  • Facoltativamente, questa specifica può essere specificata con la larghezza degli interi quando viene mappata. Ad esempio, 'data_{SPAN:2}.file' esegue il mapping a file come 'data_02.file' e 'data_27.file' (come input rispettivamente per Span-2 e Span-27), ma non esegue il mapping a 'data_1. file' né 'data_123.file'.
  • Quando la specifica SPAN è mancante, si presume che sia sempre Span '0'.
  • Se viene specificato SPAN, la pipeline elaborerà l'ultimo intervallo e memorizzerà il numero dell'intervallo nei metadati.

Ad esempio, supponiamo che ci siano dati di input:

  • '/tmp/span-1/treno/dati'
  • '/tmp/span-1/eval/data'
  • '/tmp/span-2/train/data'
  • '/tmp/span-2/valutazione/dati'

e la configurazione di input è mostrata come di seguito:

splits {
  name: 'train'
  pattern: 'span-{SPAN}/train/*'
}
splits {
  name: 'eval'
  pattern: 'span-{SPAN}/eval/*'
}

quando si attiva la pipeline, elaborerà:

  • '/tmp/span-2/train/data' come suddivisione del treno
  • '/tmp/span-2/eval/data' come suddivisione di valutazione

con numero di intervallo come '2'. Se in seguito '/tmp/span-3/...' sono pronti, è sufficiente attivare nuovamente la pipeline e raccoglierà lo span '3' per l'elaborazione. Di seguito viene mostrato l'esempio di codice per l'utilizzo delle specifiche span:

input = proto.Input(splits=[
                proto.Input.Split(name='train',
                                            pattern='span-{SPAN}/train/*'),
                proto.Input.Split(name='eval',
                                            pattern='span-{SPAN}/eval/*')
            ])
example_gen = CsvExampleGen(input_base='/tmp', input_config=input)

Il recupero di un determinato intervallo può essere eseguito con RangeConfig, descritto in dettaglio di seguito.

Data

Se l'origine dati è organizzata su filesystem per data, TFX supporta la mappatura delle date direttamente per estendere i numeri. Esistono tre specifiche per rappresentare la mappatura dalle date agli intervalli: {AAAA}, {MM} e {GG}:

  • Le tre specifiche dovrebbero essere del tutto presenti nel pattern glob di input se ne viene specificato uno:
  • È possibile specificare esclusivamente la specifica {SPAN} o questo set di specifiche della data.
  • Viene calcolata una data di calendario con l'anno a partire da AAAA, il mese a partire da MM e il giorno del mese a partire da GG, quindi il numero di intervallo viene calcolato come il numero di giorni trascorsi dall'epoca unix (es. 01-01-1970). Ad esempio, "log-{YYYY}{MM}{DD}.data" corrisponde a un file "log-19700101.data" e lo utilizza come input per Span-0 e "log-20170101.data" come input per Intervallo-17167.
  • Se viene specificato questo set di specifiche della data, la pipeline elaborerà l'ultima data più recente e memorizzerà il numero di intervallo corrispondente nei metadati.

Ad esempio, supponiamo che ci siano dati di input organizzati per data di calendario:

  • '/tmp/1970-01-02/treno/dati'
  • '/tmp/02-01-1970/eval/data'
  • '/tmp/1970-01-03/treno/dati'
  • '/tmp/1970-01-03/eval/data'

e la configurazione di input è mostrata come di seguito:

splits {
  name: 'train'
  pattern: '{YYYY}-{MM}-{DD}/train/*'
}
splits {
  name: 'eval'
  pattern: '{YYYY}-{MM}-{DD}/eval/*'
}

quando si attiva la pipeline, elaborerà:

  • '/tmp/1970-01-03/train/data' come suddivisione del treno
  • '/tmp/1970-01-03/eval/data' come suddivisione di valutazione

con numero di intervallo come '2'. Se in seguito '/tmp/1970-01-04/...' sono pronti, è sufficiente attivare nuovamente la pipeline e raccoglierà l'intervallo '3' per l'elaborazione. Di seguito viene mostrato l'esempio di codice per l'utilizzo delle specifiche della data:

input = proto.Input(splits=[
                proto.Input.Split(name='train',
                                            pattern='{YYYY}-{MM}-{DD}/train/*'),
                proto.Input.Split(name='eval',
                                            pattern='{YYYY}-{MM}-{DD}/eval/*')
            ])
example_gen = CsvExampleGen(input_base='/tmp', input_config=input)

Versione

La versione può essere recuperata utilizzando la specifica '{VERSION}' nel pattern glob di input :

  • Questa specifica corrisponde alle cifre e mappa i dati ai numeri di VERSIONE pertinenti sotto lo SPAN. Si noti che le specifiche della versione possono essere utilizzate in combinazione con le specifiche Span o Date.
  • Questa specifica può anche essere opzionalmente specificata con la larghezza allo stesso modo della specifica SPAN. ad esempio 'span-{SPAN}/version-{VERSION:4}/data-*'.
  • Quando la specifica VERSION è mancante, la versione è impostata su None.
  • Se SPAN e VERSION sono entrambi specificati, la pipeline elaborerà la versione più recente per l'intervallo più recente e memorizzerà il numero di versione nei metadati.
  • Se viene specificata VERSION, ma non SPAN (o specifica della data), verrà generato un errore.

Ad esempio, supponiamo che ci siano dati di input:

  • '/tmp/span-1/ver-1/train/data'
  • '/tmp/span-1/ver-1/eval/data'
  • '/tmp/span-2/ver-1/train/data'
  • '/tmp/span-2/ver-1/eval/data'
  • '/tmp/span-2/ver-2/train/data'
  • '/tmp/span-2/ver-2/eval/data'

e la configurazione di input è mostrata come di seguito:

splits {
  name: 'train'
  pattern: 'span-{SPAN}/ver-{VERSION}/train/*'
}
splits {
  name: 'eval'
  pattern: 'span-{SPAN}/ver-{VERSION}/eval/*'
}

quando si attiva la pipeline, elaborerà:

  • '/tmp/span-2/ver-2/train/data' come suddivisione del treno
  • '/tmp/span-2/ver-2/eval/data' come suddivisione di valutazione

con il numero di span come '2' e il numero di versione come '2'. Se in seguito '/tmp/span-2/ver-3/...' sono pronti, è sufficiente attivare nuovamente la pipeline e raccoglierà lo span '2' e la versione '3' per l'elaborazione. Di seguito viene mostrato l'esempio di codice per l'utilizzo delle specifiche della versione:

input = proto.Input(splits=[
                proto.Input.Split(name='train',
                                            pattern='span-{SPAN}/ver-{VERSION}/train/*'),
                proto.Input.Split(name='eval',
                                            pattern='span-{SPAN}/ver-{VERSION}/eval/*')
            ])
example_gen = CsvExampleGen(input_base='/tmp', input_config=input)

Config. intervallo

TFX supporta il recupero e l'elaborazione di un intervallo specifico in ExampleGen basato su file utilizzando la configurazione dell'intervallo, una configurazione astratta utilizzata per descrivere gli intervalli per diverse entità TFX. Per recuperare un intervallo specifico, imposta range_config per un componente ExampleGen basato su file. Ad esempio, supponiamo che ci siano dati di input:

  • '/tmp/span-01/train/data'
  • '/tmp/span-01/eval/data'
  • '/tmp/span-02/train/data'
  • '/tmp/span-02/eval/data'

Per recuperare ed elaborare in modo specifico i dati con intervallo '1', specifichiamo una configurazione di intervallo oltre alla configurazione di input. Si noti che ExampleGen supporta solo intervalli statici a intervallo singolo (per specificare l'elaborazione di intervalli individuali specifici). Pertanto, per StaticRange, start_span_number deve essere uguale a end_span_number. Utilizzando l'intervallo fornito e le informazioni sulla larghezza dell'intervallo (se fornite) per il riempimento zero, ExampleGen sostituirà la specifica SPAN nei modelli di divisione forniti con il numero di intervallo desiderato. Di seguito un esempio di utilizzo:

# In cases where files have zero-padding, the width modifier in SPAN spec is
# required so TFX can correctly substitute spec with zero-padded span number.
input = proto.Input(splits=[
                proto.Input.Split(name='train',
                                            pattern='span-{SPAN:2}/train/*'),
                proto.Input.Split(name='eval',
                                            pattern='span-{SPAN:2}/eval/*')
            ])
# Specify the span number to be processed here using StaticRange.
range = proto.RangeConfig(
                static_range=proto.StaticRange(
                        start_span_number=1, end_span_number=1)
            )

# After substitution, the train and eval split patterns will be
# 'input_dir/span-01/train/*' and 'input_dir/span-01/eval/*', respectively.
example_gen = CsvExampleGen(input_base=input_dir, input_config=input,
                            range_config=range)

La configurazione dell'intervallo può essere utilizzata anche per elaborare date specifiche, se viene utilizzata la specifica della data anziché la specifica SPAN. Ad esempio, supponiamo che ci siano dati di input organizzati per data di calendario:

  • '/tmp/1970-01-02/treno/dati'
  • '/tmp/02-01-1970/eval/data'
  • '/tmp/1970-01-03/treno/dati'
  • '/tmp/1970-01-03/eval/data'

Per recuperare ed elaborare specificamente i dati del 2 gennaio 1970, facciamo quanto segue:

from  tfx.components.example_gen import utils

input = proto.Input(splits=[
                proto.Input.Split(name='train',
                                            pattern='{YYYY}-{MM}-{DD}/train/*'),
                proto.Input.Split(name='eval',
                                            pattern='{YYYY}-{MM}-{DD}/eval/*')
            ])
# Specify date to be converted to span number to be processed using StaticRange.
span = utils.date_to_span_number(1970, 1, 2)
range = proto.RangeConfig(
                static_range=range_config_pb2.StaticRange(
                        start_span_number=span, end_span_number=span)
            )

# After substitution, the train and eval split patterns will be
# 'input_dir/1970-01-02/train/*' and 'input_dir/1970-01-02/eval/*',
# respectively.
example_gen = CsvExampleGen(input_base=input_dir, input_config=input,
                            range_config=range)

Esempio personalizzatoGen

Se i componenti ExampleGen attualmente disponibili non soddisfano le tue esigenze, puoi creare un ExampleGen personalizzato, che ti consentirà di leggere da diverse origini dati o in diversi formati di dati.

Esempio basato su filePersonalizzazione Gen (sperimentale)

Innanzitutto, estendi BaseExampleGenExecutor con un Beam PTransform personalizzato, che fornisce la conversione dalla suddivisione dell'input train/eval in esempi TF. Ad esempio, l' esecutore CsvExampleGen fornisce la conversione da una divisione CSV di input in esempi TF.

Quindi, crea un componente con l'executor precedente, come fatto in CsvExampleGen component . In alternativa, passa un executor personalizzato nel componente standard ExampleGen come mostrato di seguito.

from tfx.components.base import executor_spec
from tfx.components.example_gen.csv_example_gen import executor

example_gen = FileBasedExampleGen(
    input_base=os.path.join(base_dir, 'data/simple'),
    custom_executor_spec=executor_spec.ExecutorClassSpec(executor.Executor))

Ora supportiamo anche la lettura di file Avro e Parquet utilizzando questo metodo .

Formati dati aggiuntivi

Apache Beam supporta la lettura di numerosi formati di dati aggiuntivi . tramite le trasformazioni di I/O Beam. Puoi creare componenti ExampleGen personalizzati sfruttando le trasformazioni di I/O di Beam utilizzando un modello simile all'esempio di Avro

  return (pipeline
          | 'ReadFromAvro' >> beam.io.ReadFromAvro(avro_pattern)
          | 'ToTFExample' >> beam.Map(utils.dict_to_example))

Al momento della stesura di questo documento, i formati e le origini dati attualmente supportati per Beam Python SDK includono:

  • Amazon S3
  • Apache Avro
  • Apache Hadoop
  • Apache Kafka
  • Parquet Apache
  • Google Cloud BigQuery
  • Google Cloud BigTable
  • Google Cloud Datastore
  • Pubblico/Sub di Google Cloud
  • Archiviazione cloud di Google (GCS)
  • MongoDB

Controlla i documenti di Beam per l'elenco più recente.

Esempio basato su queryPersonalizzazione Gen (sperimentale)

Innanzitutto, estendi BaseExampleGenExecutor con un Beam PTransform personalizzato, che legge dall'origine dati esterna. Quindi, crea un componente semplice estendendo QueryBasedExampleGen.

Ciò potrebbe richiedere o meno configurazioni di connessione aggiuntive. Ad esempio, l' esecutore BigQuery legge utilizzando un connettore beam.io predefinito, che astrae i dettagli di configurazione della connessione. L' esecutore Presto richiede un Beam PTransform personalizzato e un protobuf di configurazione della connessione personalizzato come input.

Se è richiesta una configurazione di connessione per un componente ExampleGen personalizzato, creare un nuovo protobuf e passarlo tramite custom_config, che ora è un parametro di esecuzione facoltativo. Di seguito è riportato un esempio di come utilizzare un componente configurato.

from tfx.examples.custom_components.presto_example_gen.proto import presto_config_pb2
from tfx.examples.custom_components.presto_example_gen.presto_component.component import PrestoExampleGen

presto_config = presto_config_pb2.PrestoConnConfig(host='localhost', port=8080)
example_gen = PrestoExampleGen(presto_config, query='SELECT * FROM chicago_taxi_trips')

Componenti a valle di esempioGen

La configurazione divisa personalizzata è supportata per i componenti a valle.

StatisticheGen

Il comportamento predefinito consiste nell'eseguire la generazione di statistiche per tutte le divisioni.

Per escludere eventuali suddivisioni, impostare exclude_splits per il componente StatisticsGen. Per esempio:

# Exclude the 'eval' split.
statistics_gen = StatisticsGen(
             examples=example_gen.outputs['examples'],
             exclude_splits=['eval'])

Schema Gen

Il comportamento predefinito consiste nel generare uno schema basato su tutte le suddivisioni.

Per escludere eventuali suddivisioni, impostare exclude_splits per il componente SchemaGen. Per esempio:

# Exclude the 'eval' split.
schema_gen = SchemaGen(
             statistics=statistics_gen.outputs['statistics'],
             exclude_splits=['eval'])

EsempioConvalida

Il comportamento predefinito consiste nel convalidare le statistiche di tutte le suddivisioni sugli esempi di input rispetto a uno schema.

Per escludere eventuali suddivisioni, impostare exclude_splits per il componente ExampleValidator. Per esempio:

# Exclude the 'eval' split.
example_validator = ExampleValidator(
             statistics=statistics_gen.outputs['statistics'],
             schema=schema_gen.outputs['schema'],
             exclude_splits=['eval'])

Trasformare

Il comportamento predefinito è analizzare e produrre i metadati dalla divisione "treno" e trasformare tutte le divisioni.

Per specificare le suddivisioni di analisi e di trasformazione, impostare splits_config per il componente Trasforma. Per esempio:

# Analyze the 'train' split and transform all splits.
transform = Transform(
      examples=example_gen.outputs['examples'],
      schema=schema_gen.outputs['schema'],
      module_file=_taxi_module_file,
      splits_config=proto.SplitsConfig(analyze=['train'],
                                               transform=['train', 'eval']))

Allenatore e sintonizzatore

Il comportamento predefinito è addestrare sulla divisione 'treno' e valutare sulla divisione 'eval'.

Per specificare le suddivisioni del treno e valutare le suddivisioni, impostare train_args e eval_args per il componente Trainer. Per esempio:

# Train on the 'train' split and evaluate on the 'eval' split.
Trainer = Trainer(
      module_file=_taxi_module_file,
      examples=transform.outputs['transformed_examples'],
      schema=schema_gen.outputs['schema'],
      transform_graph=transform.outputs['transform_graph'],
      train_args=proto.TrainArgs(splits=['train'], num_steps=10000),
      eval_args=proto.EvalArgs(splits=['eval'], num_steps=5000))

Valutatore

Il comportamento predefinito è fornire le metriche calcolate sulla divisione 'eval'.

Per calcolare le statistiche di valutazione sulle suddivisioni personalizzate, imposta example_splits per il componente Evaluator. Per esempio:

# Compute metrics on the 'eval1' split and the 'eval2' split.
evaluator = Evaluator(
      examples=example_gen.outputs['examples'],
      model=trainer.outputs['model'],
      example_splits=['eval1', 'eval2'])

Ulteriori dettagli sono disponibili nel riferimento API CsvExampleGen , nell'implementazione API FileBasedExampleGen e nel riferimento API ImportExampleGen .