O componente de pipeline TFX ExampleGen

O componente ExampleGen TFX Pipeline ingere dados em pipelines TFX. Ele consome arquivos/serviços externos para gerar Exemplos que serão lidos por outros componentes do TFX. Ele também fornece partição consistente e configurável e embaralha o conjunto de dados para práticas recomendadas de ML.

  • Consome: dados de fontes de dados externas, como CSV, TFRecord , Avro, Parquet e BigQuery.
  • Emite: registros tf.Example , registros tf.SequenceExample ou formato proto, dependendo do formato da carga útil.

ExampleGen e outros componentes

ExampleGen fornece dados para componentes que usam a biblioteca TensorFlow Data Validation , como SchemaGen , StatisticsGen e Example Validator . Ele também fornece dados para Transform , que faz uso da biblioteca TensorFlow Transform e, por fim, para destinos de implantação durante a inferência.

Fontes e formatos de dados

Atualmente, uma instalação padrão do TFX inclui componentes ExampleGen completos para essas fontes e formatos de dados:

Também estão disponíveis executores personalizados que permitem o desenvolvimento de componentes ExampleGen para essas fontes e formatos de dados:

Consulte os exemplos de uso no código-fonte e esta discussão para obter mais informações sobre como usar e desenvolver executores personalizados.

Além disso, essas fontes de dados e formatos estão disponíveis como exemplos de componentes personalizados :

Ingerir formatos de dados suportados pelo Apache Beam

O Apache Beam suporta a ingestão de dados de uma ampla variedade de fontes e formatos de dados ( veja abaixo ). Esses recursos podem ser usados ​​para criar componentes ExampleGen personalizados para TFX, o que é demonstrado por alguns componentes ExampleGen existentes ( veja abaixo ).

Como usar um componente ExampleGen

Para fontes de dados compatíveis (atualmente, arquivos CSV, arquivos TFRecord com tf.Example , tf.SequenceExample e proto formato e resultados de consultas do BigQuery), o componente de pipeline ExampleGen pode ser usado diretamente na implantação e requer pouca personalização. Por exemplo:

example_gen = CsvExampleGen(input_base='data_root')

ou como abaixo para importar TFRecord externo com tf.Example diretamente:

example_gen = ImportExampleGen(input_base=path_to_tfrecord_dir)

Span, Versão e Divisão

Um Span é um agrupamento de exemplos de treinamento. Se seus dados persistirem em um sistema de arquivos, cada Span poderá ser armazenado em um diretório separado. A semântica de um Span não é codificada no TFX; um período pode corresponder a um dia de dados, uma hora de dados ou qualquer outro agrupamento que seja significativo para sua tarefa.

Cada Span pode conter várias versões de dados. Para dar um exemplo, se você remover alguns exemplos de um Span para limpar dados de baixa qualidade, isso poderá resultar em uma nova Versão desse Span. Por padrão, os componentes do TFX operam na versão mais recente em um Span.

Cada Versão dentro de um Span pode ser subdividida em várias Divisões. O caso de uso mais comum para dividir um Span é dividi-lo em dados de treinamento e de avaliação.

Intervalos e divisões

Divisão de entrada/saída personalizada

Para personalizar a proporção de divisão train/eval que o ExampleGen produzirá, defina o output_config para o componente ExampleGen. Por exemplo:

# Input has a single split 'input_dir/*'.
# Output 2 splits: train:eval=3:1.
output = proto.Output(
             split_config=example_gen_pb2.SplitConfig(splits=[
                 proto.SplitConfig.Split(name='train', hash_buckets=3),
                 proto.SplitConfig.Split(name='eval', hash_buckets=1)
             ]))
example_gen = CsvExampleGen(input_base=input_dir, output_config=output)

Observe como os hash_buckets foram definidos neste exemplo.

Para uma fonte de entrada que já foi dividida, defina o input_config para o componente ExampleGen:


# Input train split is 'input_dir/train/*', eval split is 'input_dir/eval/*'.
# Output splits are generated one-to-one mapping from input splits.
input = proto.Input(splits=[
                example_gen_pb2.Input.Split(name='train', pattern='train/*'),
                example_gen_pb2.Input.Split(name='eval', pattern='eval/*')
            ])
example_gen = CsvExampleGen(input_base=input_dir, input_config=input)

Para gen de exemplo baseado em arquivo (por exemplo, CsvExampleGen e ImportExampleGen), pattern é um padrão de arquivo relativo glob que mapeia para arquivos de entrada com diretório raiz fornecido pelo caminho base de entrada. Para geração de exemplo baseada em consulta (por exemplo, BigQueryExampleGen, PrestoExampleGen), o pattern é uma consulta SQL.

Por padrão, todo o diretório base de entrada é tratado como uma única divisão de entrada e a divisão de saída train e eval é gerada com uma proporção de 2:1.

Consulte proto/example_gen.proto para obter a configuração de divisão de entrada e saída do ExampleGen. E consulte o guia de componentes downstream para utilizar as divisões personalizadas downstream.

Método de divisão

Ao usar o método de divisão hash_buckets , em vez de todo o registro, pode-se usar um recurso para particionar os exemplos. Se um recurso estiver presente, ExampleGen usará uma impressão digital desse recurso como a chave de partição.

Esse recurso pode ser usado para manter uma divisão estável com certas propriedades de exemplos: por exemplo, um usuário sempre será colocado na mesma divisão se "user_id" for selecionado como o nome do recurso de partição.

A interpretação do que um "recurso" significa e como combinar um "recurso" com o nome especificado depende da implementação de ExampleGen e do tipo dos exemplos.

Para implementações de ExampleGen prontas:

  • Se ele gerar tf.Example, então um "recurso" significa uma entrada em tf.Example.features.feature.
  • Se ele gerar tf.SequenceExample, então um "recurso" significa uma entrada em tf.SequenceExample.context.feature.
  • Apenas os recursos int64 e bytes são suportados.

Nos seguintes casos, ExampleGen gera erros de tempo de execução:

  • O nome do recurso especificado não existe no exemplo.
  • Recurso vazio: tf.train.Feature() .
  • Tipos de recursos não suportados, por exemplo, recursos flutuantes.

Para gerar a divisão train/eval com base em um recurso nos exemplos, defina o output_config para o componente ExampleGen. Por exemplo:

# Input has a single split 'input_dir/*'.
# Output 2 splits based on 'user_id' features: train:eval=3:1.
output = proto.Output(
             split_config=proto.SplitConfig(splits=[
                 proto.SplitConfig.Split(name='train', hash_buckets=3),
                 proto.SplitConfig.Split(name='eval', hash_buckets=1)
             ],
             partition_feature_name='user_id'))
example_gen = CsvExampleGen(input_base=input_dir, output_config=output)

Observe como o partition_feature_name foi definido neste exemplo.

Período

Span pode ser recuperado usando a especificação '{SPAN}' no padrão glob de entrada :

  • Essa especificação corresponde aos dígitos e mapeia os dados nos números SPAN relevantes. Por exemplo, 'data_{SPAN}-*.tfrecord' coletará arquivos como 'data_12-a.tfrecord', 'date_12-b.tfrecord'.
  • Opcionalmente, essa especificação pode ser especificada com a largura dos inteiros quando mapeada. Por exemplo, 'data_{SPAN:2}.file' mapeia para arquivos como 'data_02.file' e 'data_27.file' (como entradas para Span-2 e Span-27 respectivamente), mas não mapeia para 'data_1. file' nem 'data_123.file'.
  • Quando a especificação SPAN está ausente, assume-se que é sempre Span '0'.
  • Se SPAN for especificado, o pipeline processará o intervalo mais recente e armazenará o número do intervalo nos metadados.

Por exemplo, vamos supor que existam dados de entrada:

  • '/tmp/span-1/train/data'
  • '/tmp/span-1/eval/data'
  • '/tmp/span-2/train/data'
  • '/tmp/span-2/eval/data'

e a configuração de entrada é mostrada abaixo:

splits {
  name: 'train'
  pattern: 'span-{SPAN}/train/*'
}
splits {
  name: 'eval'
  pattern: 'span-{SPAN}/eval/*'
}

ao acionar o pipeline, ele processará:

  • '/tmp/span-2/train/data' como divisão de trem
  • '/tmp/span-2/eval/data' como divisão eval

com o número do intervalo como '2'. Se mais tarde '/tmp/span-3/...' estiver pronto, simplesmente acione o pipeline novamente e ele pegará o intervalo '3' para processamento. Abaixo mostra o exemplo de código para usar a especificação span:

input = proto.Input(splits=[
                proto.Input.Split(name='train',
                                            pattern='span-{SPAN}/train/*'),
                proto.Input.Split(name='eval',
                                            pattern='span-{SPAN}/eval/*')
            ])
example_gen = CsvExampleGen(input_base='/tmp', input_config=input)

A recuperação de um determinado período pode ser feita com RangeConfig, detalhado abaixo.

Encontro

Se sua fonte de dados estiver organizada no sistema de arquivos por data, o TFX oferece suporte ao mapeamento de datas diretamente para números de intervalo. Existem três especificações para representar o mapeamento de datas para períodos: {YYYY}, {MM} e {DD}:

  • As três especificações devem estar presentes no padrão glob de entrada, se alguma for especificada:
  • A especificação {SPAN} ou este conjunto de especificações de data podem ser especificadas exclusivamente.
  • Uma data do calendário com o ano de YYYY, o mês de MM e o dia do mês de DD é calculada, então o número do intervalo é calculado como o número de dias desde a época unix (ou seja, 1970-01-01). Por exemplo, 'log-{YYYY}{MM}{DD}.data' corresponde a um arquivo 'log-19700101.data' e o consome como entrada para Span-0 e 'log-20170101.data' como entrada para Span-17167.
  • Se esse conjunto de especificações de data for especificado, o pipeline processará a data mais recente e armazenará o número de intervalo correspondente nos metadados.

Por exemplo, vamos supor que existam dados de entrada organizados por data de calendário:

  • '/tmp/1970-01-02/train/data'
  • '/tmp/1970-01-02/eval/data'
  • '/tmp/1970-01-03/train/data'
  • '/tmp/1970-01-03/eval/data'

e a configuração de entrada é mostrada abaixo:

splits {
  name: 'train'
  pattern: '{YYYY}-{MM}-{DD}/train/*'
}
splits {
  name: 'eval'
  pattern: '{YYYY}-{MM}-{DD}/eval/*'
}

ao acionar o pipeline, ele processará:

  • '/tmp/1970-01-03/train/data' como divisão de trem
  • '/tmp/1970-01-03/eval/data' como divisão eval

com o número do intervalo como '2'. Se mais tarde '/tmp/1970-01-04/...' estiver pronto, simplesmente acione o pipeline novamente e ele pegará o intervalo '3' para processamento. Abaixo mostra o exemplo de código para usar a especificação de data:

input = proto.Input(splits=[
                proto.Input.Split(name='train',
                                            pattern='{YYYY}-{MM}-{DD}/train/*'),
                proto.Input.Split(name='eval',
                                            pattern='{YYYY}-{MM}-{DD}/eval/*')
            ])
example_gen = CsvExampleGen(input_base='/tmp', input_config=input)

Versão

A versão pode ser recuperada usando a especificação '{VERSION}' no padrão glob de entrada :

  • Essa especificação corresponde aos dígitos e mapeia os dados para os números de VERSÃO relevantes no SPAN. Observe que a especificação de versão pode ser usada em combinação com a especificação de período ou data.
  • Esta especificação também pode ser opcionalmente especificada com a largura da mesma forma que a especificação SPAN. por exemplo, 'span-{SPAN}/version-{VERSION:4}/data-*'.
  • Quando a especificação VERSION está ausente, a versão é definida como Nenhuma.
  • Se SPAN e VERSION forem especificados, o pipeline processará a versão mais recente para o intervalo mais recente e armazenará o número da versão nos metadados.
  • Se VERSION for especificado, mas não SPAN (ou especificação de data), um erro será gerado.

Por exemplo, vamos supor que existam dados de entrada:

  • '/tmp/span-1/ver-1/train/data'
  • '/tmp/span-1/ver-1/eval/data'
  • '/tmp/span-2/ver-1/train/data'
  • '/tmp/span-2/ver-1/eval/data'
  • '/tmp/span-2/ver-2/train/data'
  • '/tmp/span-2/ver-2/eval/data'

e a configuração de entrada é mostrada abaixo:

splits {
  name: 'train'
  pattern: 'span-{SPAN}/ver-{VERSION}/train/*'
}
splits {
  name: 'eval'
  pattern: 'span-{SPAN}/ver-{VERSION}/eval/*'
}

ao acionar o pipeline, ele processará:

  • '/tmp/span-2/ver-2/train/data' como divisão de trem
  • '/tmp/span-2/ver-2/eval/data' como divisão eval

com número de extensão como '2' e número de versão como '2'. Se mais tarde '/tmp/span-2/ver-3/...' estiver pronto, simplesmente acione o pipeline novamente e ele pegará o span '2' e a versão '3' para processamento. Abaixo mostra o exemplo de código para usar a especificação da versão:

input = proto.Input(splits=[
                proto.Input.Split(name='train',
                                            pattern='span-{SPAN}/ver-{VERSION}/train/*'),
                proto.Input.Split(name='eval',
                                            pattern='span-{SPAN}/ver-{VERSION}/eval/*')
            ])
example_gen = CsvExampleGen(input_base='/tmp', input_config=input)

Configuração de intervalo

O TFX oferece suporte à recuperação e ao processamento de um intervalo específico no ExampleGen baseado em arquivo usando a configuração de intervalo, uma configuração abstrata usada para descrever intervalos para diferentes entidades do TFX. Para recuperar um intervalo específico, defina o range_config para um componente ExampleGen baseado em arquivo. Por exemplo, vamos supor que existam dados de entrada:

  • '/tmp/span-01/train/data'
  • '/tmp/span-01/eval/data'
  • '/tmp/span-02/train/data'
  • '/tmp/span-02/eval/data'

Para recuperar e processar dados especificamente com intervalo '1', especificamos uma configuração de intervalo além da configuração de entrada. Observe que ExampleGen oferece suporte apenas a intervalos estáticos de intervalo único (para especificar o processamento de intervalos individuais específicos). Assim, para StaticRange, start_span_number deve ser igual a end_span_number. Usando a extensão fornecida e as informações de largura da extensão (se fornecidas) para preenchimento de zero, ExampleGen substituirá a especificação SPAN nos padrões de divisão fornecidos pelo número de extensão desejado. Um exemplo de uso é mostrado abaixo:

# In cases where files have zero-padding, the width modifier in SPAN spec is
# required so TFX can correctly substitute spec with zero-padded span number.
input = proto.Input(splits=[
                proto.Input.Split(name='train',
                                            pattern='span-{SPAN:2}/train/*'),
                proto.Input.Split(name='eval',
                                            pattern='span-{SPAN:2}/eval/*')
            ])
# Specify the span number to be processed here using StaticRange.
range = proto.RangeConfig(
                static_range=proto.StaticRange(
                        start_span_number=1, end_span_number=1)
            )

# After substitution, the train and eval split patterns will be
# 'input_dir/span-01/train/*' and 'input_dir/span-01/eval/*', respectively.
example_gen = CsvExampleGen(input_base=input_dir, input_config=input,
                            range_config=range)

A configuração de intervalo também pode ser usada para processar datas específicas, se a especificação de data for usada em vez da especificação de SPAN. Por exemplo, vamos supor que existam dados de entrada organizados por data de calendário:

  • '/tmp/1970-01-02/train/data'
  • '/tmp/1970-01-02/eval/data'
  • '/tmp/1970-01-03/train/data'
  • '/tmp/1970-01-03/eval/data'

Para recuperar e processar dados especificamente em 2 de janeiro de 1970, fazemos o seguinte:

from  tfx.components.example_gen import utils

input = proto.Input(splits=[
                proto.Input.Split(name='train',
                                            pattern='{YYYY}-{MM}-{DD}/train/*'),
                proto.Input.Split(name='eval',
                                            pattern='{YYYY}-{MM}-{DD}/eval/*')
            ])
# Specify date to be converted to span number to be processed using StaticRange.
span = utils.date_to_span_number(1970, 1, 2)
range = proto.RangeConfig(
                static_range=range_config_pb2.StaticRange(
                        start_span_number=span, end_span_number=span)
            )

# After substitution, the train and eval split patterns will be
# 'input_dir/1970-01-02/train/*' and 'input_dir/1970-01-02/eval/*',
# respectively.
example_gen = CsvExampleGen(input_base=input_dir, input_config=input,
                            range_config=range)

Exemplo personalizadoGen

Se os componentes ExampleGen atualmente disponíveis não atenderem às suas necessidades, você poderá criar um ExampleGen personalizado, que permitirá a leitura de diferentes fontes de dados ou em diferentes formatos de dados.

Personalização de geração de exemplo baseada em arquivo (experimental)

Primeiro, estenda o BaseExampleGenExecutor com um Beam PTransform personalizado, que fornece a conversão de sua divisão de entrada train/eval para exemplos de TF. Por exemplo, o executor CsvExampleGen fornece a conversão de uma divisão de CSV de entrada para exemplos de TF.

Em seguida, crie um componente com o executor acima, conforme feito no componente CsvExampleGen . Como alternativa, passe um executor personalizado para o componente ExampleGen padrão, conforme mostrado abaixo.

from tfx.components.base import executor_spec
from tfx.components.example_gen.csv_example_gen import executor

example_gen = FileBasedExampleGen(
    input_base=os.path.join(base_dir, 'data/simple'),
    custom_executor_spec=executor_spec.ExecutorClassSpec(executor.Executor))

Agora, também suportamos a leitura de arquivos Avro e Parquet usando este método .

Formatos de dados adicionais

O Apache Beam suporta a leitura de vários formatos de dados adicionais . através de Transformadas de E/S de Feixe. Você pode criar componentes ExampleGen personalizados aproveitando o Beam I/O Transforms usando um padrão semelhante ao exemplo do Avro

  return (pipeline
          | 'ReadFromAvro' >> beam.io.ReadFromAvro(avro_pattern)
          | 'ToTFExample' >> beam.Map(utils.dict_to_example))

No momento da redação deste artigo, os formatos e fontes de dados atualmente suportados para o SDK do Beam Python incluem:

  • Amazon S3
  • Apache Avro
  • Apache Hadoop
  • Apache Kafka
  • Apache Parquet
  • Google Cloud BigQuery
  • Google Cloud BigTable
  • Armazenamento de dados do Google Cloud
  • Google Cloud Pub/Sub
  • Armazenamento em nuvem do Google (GCS)
  • MongoDB

Verifique os documentos do Beam para obter a lista mais recente.

Personalização de geração de exemplo com base em consulta (experimental)

Primeiro, estenda o BaseExampleGenExecutor com um Beam PTransform personalizado, que lê a partir da fonte de dados externa. Em seguida, crie um componente simples estendendo QueryBasedExampleGen.

Isso pode ou não exigir configurações de conexão adicionais. Por exemplo, o executor do BigQuery lê usando um conector beam.io padrão, que abstrai os detalhes de configuração da conexão. O executor Presto requer um Beam PTransform personalizado e um protobuf de configuração de conexão personalizado como entrada.

Se uma configuração de conexão for necessária para um componente ExampleGen personalizado, crie um novo protobuf e passe-o por custom_config, que agora é um parâmetro de execução opcional. Abaixo está um exemplo de como usar um componente configurado.

from tfx.examples.custom_components.presto_example_gen.proto import presto_config_pb2
from tfx.examples.custom_components.presto_example_gen.presto_component.component import PrestoExampleGen

presto_config = presto_config_pb2.PrestoConnConfig(host='localhost', port=8080)
example_gen = PrestoExampleGen(presto_config, query='SELECT * FROM chicago_taxi_trips')

Componentes a jusante de exemplo

A configuração de divisão personalizada é suportada para componentes downstream.

EstatísticaGen

O comportamento padrão é realizar a geração de estatísticas para todas as divisões.

Para excluir quaisquer divisões, defina o exclude_splits para o componente StatisticsGen. Por exemplo:

# Exclude the 'eval' split.
statistics_gen = StatisticsGen(
             examples=example_gen.outputs['examples'],
             exclude_splits=['eval'])

SchemaGen

O comportamento padrão é gerar um esquema baseado em todas as divisões.

Para excluir quaisquer divisões, defina o exclude_splits para o componente SchemaGen. Por exemplo:

# Exclude the 'eval' split.
schema_gen = SchemaGen(
             statistics=statistics_gen.outputs['statistics'],
             exclude_splits=['eval'])

Validador de exemplo

O comportamento padrão é validar as estatísticas de todas as divisões nos exemplos de entrada em relação a um esquema.

Para excluir quaisquer divisões, defina o exclude_splits para o componente ExampleValidator. Por exemplo:

# Exclude the 'eval' split.
example_validator = ExampleValidator(
             statistics=statistics_gen.outputs['statistics'],
             schema=schema_gen.outputs['schema'],
             exclude_splits=['eval'])

Transformar

O comportamento padrão é analisar e produzir os metadados da divisão 'treinar' e transformar todas as divisões.

Para especificar as divisões de análise e de transformação, defina o splits_config para o componente Transform. Por exemplo:

# Analyze the 'train' split and transform all splits.
transform = Transform(
      examples=example_gen.outputs['examples'],
      schema=schema_gen.outputs['schema'],
      module_file=_taxi_module_file,
      splits_config=proto.SplitsConfig(analyze=['train'],
                                               transform=['train', 'eval']))

Treinador e Afinador

O comportamento padrão é treinar na divisão 'treinar' e avaliar na divisão 'eval'.

Para especificar as divisões de trem e avaliar as divisões, defina os train_args e eval_args para Trainer. Por exemplo:

# Train on the 'train' split and evaluate on the 'eval' split.
Trainer = Trainer(
      module_file=_taxi_module_file,
      examples=transform.outputs['transformed_examples'],
      schema=schema_gen.outputs['schema'],
      transform_graph=transform.outputs['transform_graph'],
      train_args=proto.TrainArgs(splits=['train'], num_steps=10000),
      eval_args=proto.EvalArgs(splits=['eval'], num_steps=5000))

Avaliador

O comportamento padrão é fornecer métricas calculadas na divisão 'eval'.

Para calcular estatísticas de avaliação em divisões personalizadas, defina o componente example_splits para Evaluator. Por exemplo:

# Compute metrics on the 'eval1' split and the 'eval2' split.
evaluator = Evaluator(
      examples=example_gen.outputs['examples'],
      model=trainer.outputs['model'],
      example_splits=['eval1', 'eval2'])

Mais detalhes estão disponíveis na referência da API CsvExampleGen, implementação da API FileBasedExampleGen e referência da API ImportExampleGen .