Esta página foi traduzida pela API Cloud Translation.
Switch to English

O componente de pipeline do TFX ExampleGen

O componente ExampleGen TFX Pipeline ingere dados em pipelines TFX. Ele consome arquivos / serviços externos para gerar exemplos que serão lidos por outros componentes TFX. Ele também fornece partição consistente e configurável e embaralha o conjunto de dados para as práticas recomendadas de ML.

  • Consome: dados de fontes externas, como CSV, TFRecord , Avro, Parquet e BigQuery.
  • Emite: registros tf.Example , registros tf.SequenceExample ou formato proto, dependendo do formato da carga útil.

ExampleGen e outros componentes

ExampleGen fornece dados para componentes que usam a biblioteca TensorFlow Data Validation , como SchemaGen , StatisticsGen e Example Validator . Ele também fornece dados para o Transform , que usa a biblioteca TensorFlow Transform e, por fim, para os destinos de implantação durante a inferência.

Fontes de dados e formatos

Atualmente, uma instalação padrão do TFX inclui componentes ExampleGen completos para essas fontes e formatos de dados:

Também estão disponíveis executores personalizados que permitem o desenvolvimento de componentes ExampleGen para essas fontes e formatos de dados:

Consulte os exemplos de uso no código-fonte e esta discussão para obter mais informações sobre como usar e desenvolver executores personalizados.

Além disso, essas fontes de dados e formatos estão disponíveis como exemplos de componentes personalizados :

Formatos de ingestão de dados que são suportados pelo Apache Beam

O Apache Beam suporta a ingestão de dados de uma ampla variedade de fontes e formatos de dados , incluindo arquivos, Avro, texto, tf.Record, Parquet, S3, GCS, Hadoop, Kafka, PubSub, BigQuery, BigTable, Datastore, Mongo e Flink. Esses recursos podem ser usados ​​para criar componentes ExampleGen para TFX, o que é demonstrado por alguns componentes ExampleGen existentes, como Avro

Como usar um componente ExampleGen

Para fontes de dados compatíveis (atualmente, arquivos CSV, arquivos TFRecord com tf.Example , tf.SequenceExample e formato proto e resultados de consultas do BigQuery), o componente de pipeline ExampleGen pode ser usado diretamente na implantação e requer pouca personalização. Por exemplo:

from tfx.utils.dsl_utils import csv_input
from tfx.components.example_gen.csv_example_gen.component import CsvExampleGen

examples = csv_input(os.path.join(base_dir, 'data/simple'))
example_gen = CsvExampleGen(input=examples)

ou como abaixo para importar TFRecord externo com tf.Example diretamente:

from tfx.utils.dsl_utils import tfrecord_input
from tfx.components.example_gen.import_example_gen.component import ImportExampleGen

examples = tfrecord_input(path_to_tfrecord_dir)
example_gen = ImportExampleGen(input=examples)

Span, Version e Split

Um Span é um agrupamento de exemplos de treinamento. Se seus dados persistirem em um sistema de arquivos, cada Span pode ser armazenado em um diretório separado. A semântica de um Span não é codificada no TFX; um Span pode corresponder a um dia de dados, uma hora de dados ou qualquer outro agrupamento que seja significativo para sua tarefa.

Cada Span pode conter várias versões de dados. Para dar um exemplo, se você remover alguns exemplos de um Span para limpar dados de baixa qualidade, isso pode resultar em uma nova versão desse Span. Por padrão, os componentes TFX operam na versão mais recente dentro de um Span.

Cada versão dentro de um Span pode ser subdividida em várias Divisões. O caso de uso mais comum para dividir um Span é dividi-lo em dados de treinamento e avaliação.

Spans and Splits

Divisão de entrada / saída personalizada

Para personalizar a proporção de divisão de treinamento / avaliação que ExampleGen produzirá, defina output_config para o componente ExampleGen. Por exemplo:

from  tfx.proto import example_gen_pb2

# Input has a single split 'input_dir/*'.
# Output 2 splits: train:eval=3:1.
output = example_gen_pb2.Output(
             split_config=example_gen_pb2.SplitConfig(splits=[
                 example_gen_pb2.SplitConfig.Split(name='train', hash_buckets=3),
                 example_gen_pb2.SplitConfig.Split(name='eval', hash_buckets=1)
             ]))
examples = csv_input(input_dir)
example_gen = CsvExampleGen(input=examples, output_config=output)

Observe como os hash_buckets foram definidos neste exemplo.

Para uma fonte de entrada que já foi dividida, defina o input_config para o componente ExampleGen:

from  tfx.proto import example_gen_pb2

# Input train split is 'input_dir/train/*', eval split is 'input_dir/eval/*'.
# Output splits are generated one-to-one mapping from input splits.
input = example_gen_pb2.Input(splits=[
                example_gen_pb2.Input.Split(name='train', pattern='train/*'),
                example_gen_pb2.Input.Split(name='eval', pattern='eval/*')
            ])
examples = csv_input(input_dir)
example_gen = CsvExampleGen(input=examples, input_config=input)

Para o exemplo baseado em arquivo gen (por exemplo, CsvExampleGen e ImportExampleGen), o pattern é um padrão de arquivo relativo glob que mapeia para arquivos de entrada com diretório raiz fornecido pelo caminho de base de entrada. Para gen de exemplo baseado em consulta (por exemplo, BigQueryExampleGen, PrestoExampleGen), o pattern é uma consulta SQL.

Por padrão, todo o diretório base de entrada é tratado como uma única divisão de entrada, e a divisão de saída de trem e avaliação é gerada com uma proporção de 2: 1.

Consulte proto / example_gen.proto para a configuração de divisão de entrada e saída de ExampleGen. E consulte o guia de componentes downstream para utilizar as divisões downstream personalizadas.

Método de Divisão

Ao usar o método de divisão hash_buckets , em vez de todo o registro, pode-se usar um recurso para particionar os exemplos. Se um recurso estiver presente, ExampleGen usará uma impressão digital desse recurso como a chave de partição.

Este recurso pode ser usado para manter uma divisão estável em certas propriedades de exemplos: por exemplo, um usuário sempre será colocado na mesma divisão se "user_id" for selecionado como o nome do recurso de partição.

A interpretação do que significa um "recurso" e como fazer a correspondência de um "recurso" com o nome especificado depende da implementação de ExampleGen e do tipo dos exemplos.

Para implementações ExampleGen prontas:

  • Se ele gerar tf.Example, um "recurso" significa uma entrada em tf.Example.features.feature.
  • Se ele gerar tf.SequenceExample, um "recurso" significa uma entrada em tf.SequenceExample.context.feature.
  • Apenas os recursos int64 e bytes são suportados.

Nos casos a seguir, ExampleGen gera erros de tempo de execução:

  • O nome do recurso especificado não existe no exemplo.
  • Recurso vazio: tf.train.Feature() .
  • Tipos de recursos não suportados, por exemplo, recursos flutuantes.

Para gerar a divisão trem / avaliação com base em um recurso dos exemplos, defina output_config para o componente ExampleGen. Por exemplo:

from  tfx.proto import example_gen_pb2

# Input has a single split 'input_dir/*'.
# Output 2 splits based on 'user_id' features: train:eval=3:1.
output = example_gen_pb2.Output(
             split_config=example_gen_pb2.SplitConfig(splits=[
                 example_gen_pb2.SplitConfig.Split(name='train', hash_buckets=3),
                 example_gen_pb2.SplitConfig.Split(name='eval', hash_buckets=1)
             ],
             partition_feature_name='user_id'))
examples = csv_input(input_dir)
example_gen = CsvExampleGen(input=examples, output_config=output)

Observe como o partition_feature_name foi definido neste exemplo.

Período

O intervalo pode ser recuperado usando a especificação '{SPAN}' no padrão glob de entrada :

  • Esta especificação corresponde a dígitos e mapeia os dados nos números de SPAN relevantes. Por exemplo, 'data_ {SPAN} - *. Tfrecord' irá coletar arquivos como 'data_12-a.tfrecord', 'date_12-b.tfrecord'.
  • Opcionalmente, esta especificação pode ser especificada com a largura dos inteiros quando mapeada. Por exemplo, 'data_ {SPAN: 2} .file' mapeia para arquivos como 'data_02.file' e 'data_27.file' (como entradas para Span-2 e Span-27 respectivamente), mas não mapeia para 'data_1. arquivo 'nem' data_123.file '.
  • Quando a especificação de SPAN está ausente, é considerado sempre Span '0'.
  • Se SPAN for especificado, o pipeline processará o intervalo mais recente e armazenará o número do intervalo nos metadados.

Por exemplo, vamos supor que haja dados de entrada:

  • '/ tmp / span-1 / train / data'
  • '/ tmp / span-1 / eval / data'
  • '/ tmp / span-2 / train / data'
  • '/ tmp / span-2 / eval / data'

e a configuração de entrada é mostrada como abaixo:

splits {
  name: 'train'
  pattern: 'span-{SPAN}/train/*'
}
splits {
  name: 'eval'
  pattern: 'span-{SPAN}/eval/*'
}

ao acionar o pipeline, ele processará:

  • '/ tmp / span-2 / train / data' como divisão de trem
  • '/ tmp / span-2 / eval / data' como divisão de avaliação

com o número do intervalo como '2'. Se mais tarde '/ tmp / span-3 / ...' estiverem prontos, simplesmente acione o pipeline novamente e ele pegará o span '3' para processamento. Abaixo mostra o exemplo de código para usar as especificações de amplitude:

from  tfx.proto import example_gen_pb2

input = example_gen_pb2.Input(splits=[
                example_gen_pb2.Input.Split(name='train',
                                            pattern='span-{SPAN}/train/*'),
                example_gen_pb2.Input.Split(name='eval',
                                            pattern='span-{SPAN}/eval/*')
            ])
examples = csv_input('/tmp')
example_gen = CsvExampleGen(input=examples, input_config=input)

A recuperação de um determinado intervalo pode ser feita com RangeConfig, que é detalhado a seguir.

Encontro

Se sua fonte de dados estiver organizada no sistema de arquivos por data, o TFX suporta o mapeamento de datas diretamente para números abrangidos. Existem três especificações para representar o mapeamento de datas a períodos: {AAAA}, {MM} e {DD}:

  • As três especificações devem estar presentes no padrão glob de entrada se alguma for especificada:
  • As especificações {SPAN} ou este conjunto de especificações de data podem ser especificadas exclusivamente.
  • Uma data do calendário com o ano de AAAA, o mês de MM e o dia do mês de DD é calculada, então o número do intervalo é calculado como o número de dias desde a época unix (ou seja, 1970-01-01). Por exemplo, 'log- {YYYY} {MM} {DD} .data' corresponde a um arquivo 'log-19700101.data' e o consome como entrada para Span-0, e 'log-20170101.data' como entrada para Span-17167.
  • Se este conjunto de especificações de data for especificado, o pipeline processará a última data mais recente e armazenará o número do intervalo correspondente nos metadados.

Por exemplo, vamos supor que haja dados de entrada organizados por data do calendário:

  • '/ tmp / 1970-01-02 / train / data'
  • '/ tmp / 1970-01-02 / eval / data'
  • '/ tmp / 1970-01-03 / train / data'
  • '/ tmp / 1970-01-03 / eval / data'

e a configuração de entrada é mostrada abaixo:

splits {
  name: 'train'
  pattern: '{YYYY}-{MM}-{DD}/train/*'
}
splits {
  name: 'eval'
  pattern: '{YYYY}-{MM}-{DD}/eval/*'
}

ao acionar o pipeline, ele processará:

  • '/ tmp / 1970-01-03 / train / data' como divisão de trem
  • '/ tmp / 1970-01-03 / eval / data' como divisão de avaliação

com o número do intervalo como '2'. Se mais tarde '/ tmp / 1970-01-04 / ...' estiverem prontos, simplesmente acione o pipeline novamente e ele pegará o intervalo '3' para processamento. Abaixo mostra o exemplo de código para usar a especificação de data:

from  tfx.proto import example_gen_pb2

input = example_gen_pb2.Input(splits=[
                example_gen_pb2.Input.Split(name='train',
                                            pattern='{YYYY}-{MM}-{DD}/train/*'),
                example_gen_pb2.Input.Split(name='eval',
                                            pattern='{YYYY}-{MM}-{DD}/eval/*')
            ])
examples = csv_input('/tmp')
example_gen = CsvExampleGen(input=examples, input_config=input)

Versão

A versão pode ser recuperada usando a especificação '{VERSION}' no padrão glob de entrada :

  • Esta especificação corresponde a dígitos e mapeia os dados para os números de VERSÃO relevantes no SPAN. Observe que a especificação de versão pode ser usada em combinação com a especificação de período ou data.
  • Esta especificação também pode ser opcionalmente especificada com a largura da mesma maneira que a especificação de SPAN. por exemplo, 'span- {SPAN} / version- {VERSION: 4} / data- *'.
  • Quando a especificação VERSION está ausente, a versão é definida como Nenhum.
  • Se SPAN e VERSION forem especificados, o pipeline processará a versão mais recente para o span mais recente e armazenará o número da versão nos metadados.
  • Se VERSION for especificado, mas não SPAN (ou especificação de data), um erro será gerado.

Por exemplo, vamos supor que haja dados de entrada:

  • '/ tmp / span-1 / ver-1 / train / data'
  • '/ tmp / span-1 / ver-1 / eval / data'
  • '/ tmp / span-2 / ver-1 / train / data'
  • '/ tmp / span-2 / ver-1 / eval / data'
  • '/ tmp / span-2 / ver-2 / train / data'
  • '/ tmp / span-2 / ver-2 / eval / data'

e a configuração de entrada é mostrada como abaixo:

splits {
  name: 'train'
  pattern: 'span-{SPAN}/ver-{VERSION}/train/*'
}
splits {
  name: 'eval'
  pattern: 'span-{SPAN}/ver-{VERSION}/eval/*'
}

ao acionar o pipeline, ele processará:

  • '/ tmp / span-2 / ver-2 / train / data' como divisão de trem
  • '/ tmp / span-2 / ver-2 / eval / data' como divisão de avaliação

com o número do intervalo como '2' e o número da versão como '2'. Se mais tarde '/ tmp / span-2 / ver-3 / ...' estiverem prontos, simplesmente acione o pipeline novamente e ele pegará o span '2' e a versão '3' para processamento. Abaixo mostra o exemplo de código para usar a especificação de versão:

from  tfx.proto import example_gen_pb2

input = example_gen_pb2.Input(splits=[
                example_gen_pb2.Input.Split(name='train',
                                            pattern='span-{SPAN}/ver-{VERSION}/train/*'),
                example_gen_pb2.Input.Split(name='eval',
                                            pattern='span-{SPAN}/ver-{VERSION}/eval/*')
            ])
examples = csv_input('/tmp')
example_gen = CsvExampleGen(input=examples, input_config=input)

Configuração de intervalo

TFX oferece suporte à recuperação e ao processamento de um intervalo específico em ExampleGen baseado em arquivo, usando a configuração de intervalo, uma configuração abstrata usada para descrever intervalos para diferentes entidades TFX. Para recuperar um intervalo específico, defina o range_config para um componente ExampleGen baseado em arquivo. Por exemplo, vamos supor que haja dados de entrada:

  • '/ tmp / span-01 / train / data'
  • '/ tmp / span-01 / eval / data'
  • '/ tmp / span-02 / train / data'
  • '/ tmp / span-02 / eval / data'

Para recuperar e processar dados especificamente com intervalo '1', especificamos uma configuração de intervalo além da configuração de entrada. Observe que ExampleGen só oferece suporte a intervalos estáticos de intervalo único (para especificar o processamento de intervalos individuais específicos). Portanto, para StaticRange, start_span_number deve ser igual a end_span_number. Usando a amplitude fornecida e as informações de largura de amplitude (se fornecidas) para preenchimento de zero, ExampleGen substituirá a especificação de SPAN nos padrões de divisão fornecidos pelo número de amplitude desejado. Um exemplo de uso é mostrado abaixo:

from  tfx.proto import example_gen_pb2
from  tfx.proto import range_config_pb2

# In cases where files have zero-padding, the width modifier in SPAN spec is
# required so TFX can correctly substitute spec with zero-padded span number.
input = example_gen_pb2.Input(splits=[
                example_gen_pb2.Input.Split(name='train',
                                            pattern='span-{SPAN:2}/train/*'),
                example_gen_pb2.Input.Split(name='eval',
                                            pattern='span-{SPAN:2}/eval/*')
            ])
# Specify the span number to be processed here using StaticRange.
range = range_config_pb2.RangeConfig(
                static_range=range_config_pb2.StaticRange(
                        start_span_number=1, end_span_number=1)
            )

# After substitution, the train and eval split patterns will be
# 'input_dir/span-01/train/*' and 'input_dir/span-01/eval/*', respectively.
examples = csv_input(input_dir)
example_gen = CsvExampleGen(input=examples, input_config=input,
                            range_config=range)

A configuração de intervalo também pode ser usada para processar datas específicas, se a especificação de data for usada em vez da especificação de SPAN. Por exemplo, vamos supor que haja dados de entrada organizados por data do calendário:

  • '/ tmp / 1970-01-02 / train / data'
  • '/ tmp / 1970-01-02 / eval / data'
  • '/ tmp / 1970-01-03 / train / data'
  • '/ tmp / 1970-01-03 / eval / data'

Para recuperar e processar dados especificamente em 2 de janeiro de 1970, fazemos o seguinte:

from  tfx.components.example_gen import utils
from  tfx.proto import example_gen_pb2
from  tfx.proto import range_config_pb2

input = example_gen_pb2.Input(splits=[
                example_gen_pb2.Input.Split(name='train',
                                            pattern='{YYYY}-{MM}-{DD}/train/*'),
                example_gen_pb2.Input.Split(name='eval',
                                            pattern='{YYYY}-{MM}-{DD}/eval/*')
            ])
# Specify date to be converted to span number to be processed using StaticRange.
span = utils.date_to_span_number(1970, 1, 2)
range = range_config_pb2.RangeConfig(
                static_range=range_config_pb2.StaticRange(
                        start_span_number=span, end_span_number=span)
            )

# After substitution, the train and eval split patterns will be
# 'input_dir/1970-01-02/train/*' and 'input_dir/1970-01-02/eval/*',
# respectively.
examples = csv_input(input_dir)
example_gen = CsvExampleGen(input=examples, input_config=input,
                            range_config=range)

ExemploGen personalizado

Se os componentes ExampleGen disponíveis atualmente não atendem às suas necessidades, crie um ExampleGen personalizado, que incluirá um novo executor estendido de BaseExampleGenExecutor.

ExampleGen baseado em arquivo

Primeiro, estenda BaseExampleGenExecutor com um Beam PTransform personalizado, que fornece a conversão de sua divisão de entrada de trem / avaliação em exemplos TF. Por exemplo, o executor CsvExampleGen fornece a conversão de uma divisão CSV de entrada para exemplos TF.

Em seguida, crie um componente com o executor acima, como feito no componente CsvExampleGen . Como alternativa, passe um executor personalizado para o componente ExampleGen padrão, conforme mostrado abaixo.

from tfx.components.base import executor_spec
from tfx.components.example_gen.component import FileBasedExampleGen
from tfx.components.example_gen.csv_example_gen import executor
from tfx.utils.dsl_utils import external_input

examples = external_input(os.path.join(base_dir, 'data/simple'))
example_gen = FileBasedExampleGen(
    input=examples,
    custom_executor_spec=executor_spec.ExecutorClassSpec(executor.Executor))

Agora, também oferecemos suporte à leitura de arquivos Avro e Parquet usando esse método .

ExampleGen baseado em consulta

Primeiro, estenda BaseExampleGenExecutor com um Beam PTransform personalizado, que lê a partir da fonte de dados externa. Em seguida, crie um componente simples estendendo QueryBasedExampleGen.

Isso pode ou não exigir configurações de conexão adicionais. Por exemplo, o executor do BigQuery lê usando um conector beam.io padrão, que abstrai os detalhes de configuração da conexão. O executor Presto requer um Beam PTransform personalizado e um protobuf de configuração de conexão personalizado como entrada.

Se uma configuração de conexão for necessária para um componente ExampleGen personalizado, crie um novo protobuf e passe-o por meio de custom_config, que agora é um parâmetro de execução opcional. Abaixo está um exemplo de como usar um componente configurado.

from tfx.examples.custom_components.presto_example_gen.proto import presto_config_pb2
from tfx.examples.custom_components.presto_example_gen.presto_component.component import PrestoExampleGen

presto_config = presto_config_pb2.PrestoConnConfig(host='localhost', port=8080)
example_gen = PrestoExampleGen(presto_config, query='SELECT * FROM chicago_taxi_trips')

Componentes de Downstream de ExampleGen

A configuração de divisão personalizada é suportada para componentes downstream.

StatisticsGen

O comportamento padrão é realizar a geração de estatísticas para todas as divisões.

Para excluir quaisquer divisões, defina o exclude_splits para o componente StatisticsGen. Por exemplo:

from tfx import components

...

# Exclude the 'eval' split.
statistics_gen = components.StatisticsGen(
             examples=example_gen.outputs['examples'],
             exclude_splits=['eval'])

SchemaGen

O comportamento padrão é gerar um esquema baseado em todas as divisões.

Para excluir quaisquer divisões, defina o exclude_splits para o componente SchemaGen. Por exemplo:

from tfx import components

...

# Exclude the 'eval' split.
schema_gen = components.SchemaGen(
             statistics=statistics_gen.outputs['statistics'],
             exclude_splits=['eval'])

Validador Exemplo

O comportamento padrão é validar as estatísticas de todas as divisões nos exemplos de entrada em um esquema.

Para excluir quaisquer divisões, defina o exclude_splits para o componente ExampleValidator. Por exemplo:

from tfx import components

...

# Exclude the 'eval' split.
example_validator = components.ExampleValidator(
             statistics=statistics_gen.outputs['statistics'],
             schema=schema_gen.outputs['schema'],
             exclude_splits=['eval'])

Transformar

O comportamento padrão é analisar e produzir os metadados da divisão 'treinar' e transformar todas as divisões.

Para especificar as divisões de análise e as divisões de transformação, defina splits_config para o componente Transform. Por exemplo:

from tfx import components
from  tfx.proto import transform_pb2

...

# Analyze the 'train' split and transform all splits.
transform = components.Transform(
      examples=example_gen.outputs['examples'],
      schema=schema_gen.outputs['schema'],
      module_file=_taxi_module_file,
      splits_config=transform_pb2.SplitsConfig(analyze=['train'],
                                               transform=['train', 'eval']))

Trainer e Tuner

O comportamento padrão é treinar na divisão 'trem' e avaliar na divisão 'avaliação'.

Para especificar as divisões de trem e avaliar as divisões, defina train_args e eval_args para o componente Trainer. Por exemplo:

from tfx import components
from  tfx.proto import trainer_pb2

...

# Train on the 'train' split and evaluate on the 'eval' split.
Trainer = components.Trainer(
      module_file=_taxi_module_file,
      examples=transform.outputs['transformed_examples'],
      schema=schema_gen.outputs['schema'],
      transform_graph=transform.outputs['transform_graph'],
      train_args=trainer_pb2.TrainArgs(splits=['train'], num_steps=10000),
      eval_args=trainer_pb2.EvalArgs(splits=['eval'], num_steps=5000))

Avaliador

O comportamento padrão é fornecer métricas calculadas na divisão 'eval'.

Para calcular uma estatística de avaliação em divisões personalizadas, defina example_splits para o componente Evaluator. Por exemplo:

from tfx import components
from  tfx.proto import evaluator_pb2

...

# Compute metrics on the 'eval1' split and the 'eval2' split.
Trainer = components.Evaluator(
      examples=example_gen.outputs['examples'],
      model=trainer.outputs['model'],
      example_splits=['eval1', 'eval2'])