Esta página foi traduzida pela API Cloud Translation.
Switch to English

Primeiros passos com a TensorFlow Transform

Este guia apresenta os conceitos básicos de tf.Transform e como usá-los. Será:

  • Defina uma função de pré - processamento , uma descrição lógica do pipeline que transforma os dados brutos nos dados usados ​​para treinar um modelo de aprendizado de máquina.
  • Mostre a implementação do Apache Beam usada para transformar dados, convertendo a função de pré - processamento em um pipeline do Beam .
  • Mostre exemplos de uso adicionais.

Defina uma função de pré-processamento

A função de pré-processamento é o conceito mais importante de tf.Transform . A função de pré-processamento é uma descrição lógica de uma transformação do conjunto de dados. A função de pré-processamento aceita e retorna um dicionário de tensores, onde um tensor significa Tensor ou 2D SparseTensor . Existem dois tipos de funções usadas para definir a função de pré-processamento:

  1. Qualquer função que aceita e retorna tensores. Isso adiciona operações do TensorFlow ao gráfico que transformam dados brutos em dados transformados.
  2. Qualquer um dos analisadores fornecidos por tf.Transform . Os analisadores também aceitam e retornam tensores, mas, ao contrário das funções do TensorFlow, eles não adicionam operações ao gráfico. Em vez disso, os analisadores fazem com que o tf.Transform calcule uma operação de passagem completa fora do TensorFlow. Eles usam os valores do tensor de entrada em todo o conjunto de dados para gerar um tensor constante que é retornado como saída. Por exemplo, tft.min calcula o mínimo de um tensor sobre o conjunto de dados. tf.Transform fornece um conjunto fixo de analisadores, mas isso será estendido em versões futuras.

Exemplo de função de pré-processamento

Ao combinar analisadores e funções regulares do TensorFlow, os usuários podem criar pipelines flexíveis para transformar dados. A função de pré-processamento a seguir transforma cada um dos três recursos de maneiras diferentes e combina dois dos recursos:

import tensorflow as tf
import tensorflow_transform as tft
import tensorflow_transform.beam as tft_beam

def preprocessing_fn(inputs):
  x = inputs['x']
  y = inputs['y']
  s = inputs['s']
  x_centered = x - tft.mean(x)
  y_normalized = tft.scale_to_0_1(y)
  s_integerized = tft.compute_and_apply_vocabulary(s)
  x_centered_times_y_normalized = x_centered * y_normalized
  return {
      'x_centered': x_centered,
      'y_normalized': y_normalized,
      'x_centered_times_y_normalized': x_centered_times_y_normalized,
      's_integerized': s_integerized
  }

Aqui, x , y e s são Tensor que representam recursos de entrada. O primeiro novo tensor criado, x_centered , é construído aplicando tft.mean x e subtraindo isso de x . tft.mean(x) retorna um tensor que representa a média do tensor x . x_centered é o tensor x com a média subtraída.

O segundo novo tensor, y_normalized , é criado de maneira semelhante, mas usando o método de conveniência tft.scale_to_0_1 . Este método faz algo semelhante a calcular x_centered , ou seja, calcular um máximo e um mínimo e usá-los para dimensionar y .

O tensor s_integerized mostra um exemplo de manipulação de string. Nesse caso, pegamos uma string e a mapeamos para um inteiro. Isso usa a função de conveniência tft.compute_and_apply_vocabulary . Essa função usa um analisador para calcular os valores exclusivos obtidos pelas strings de entrada e, em seguida, usa as operações do TensorFlow para converter as strings de entrada em índices na tabela de valores exclusivos.

A coluna final mostra que é possível usar as operações do TensorFlow para criar novos recursos combinando tensores.

A função de pré-processamento define um pipeline de operações em um conjunto de dados. Para aplicar o pipeline, contamos com uma implementação concreta da API tf.Transform . A implementação do Apache Beam fornece PTransform que aplica a função de pré-processamento do usuário aos dados. O fluxo de trabalho típico de um usuário tf.Transform construirá uma função de pré-processamento e a incorporará em um pipeline maior do Beam, criando os dados para treinamento.

Lote

Os lotes são uma parte importante do TensorFlow. Como um dos objetivos do tf.Transform é fornecer um gráfico do TensorFlow para pré-processamento que pode ser incorporado ao gráfico de veiculação (e, opcionalmente, ao gráfico de treinamento), o tf.Transform em tf.Transform também é um conceito importante em tf.Transform .

Embora não seja óbvio no exemplo acima, a função de pré-processamento definida pelo usuário recebe tensores que representam lotes e não instâncias individuais, como acontece durante o treinamento e a disponibilização com o TensorFlow. Por outro lado, os analisadores realizam um cálculo em todo o conjunto de dados que retorna um único valor e não um lote de valores. x é um Tensor com a forma (batch_size,) , enquanto tft.mean(x) é um Tensor com a forma () . A subtração x - tft.mean(x) transmite onde o valor de tft.mean(x) é subtraído de cada elemento do lote representado por x .

Implementação do Apache Beam

Embora a função de pré - processamento seja uma descrição lógica de um pipeline de pré - processamento implementado em várias estruturas de processamento de dados, tf.Transform fornece uma implementação canônica usada no Apache Beam. Esta implementação demonstra a funcionalidade exigida de uma implementação. Não há API formal para essa funcionalidade, portanto, cada implementação pode usar uma API idiomática para sua estrutura de processamento de dados específica.

A implementação do Apache Beam fornece dois PTransform s usados ​​para processar dados para uma função de pré-processamento. O seguinte mostra o uso do composto PTransform AnalyzeAndTransformDataset :

raw_data = [
    {'x': 1, 'y': 1, 's': 'hello'},
    {'x': 2, 'y': 2, 's': 'world'},
    {'x': 3, 'y': 3, 's': 'hello'}
]

raw_data_metadata = ...
transformed_dataset, transform_fn = (
    (raw_data, raw_data_metadata) | tft_beam.AnalyzeAndTransformDataset(
        preprocessing_fn))
transformed_data, transformed_metadata = transformed_dataset

O transformed_data conteúdo é mostrado abaixo e contém as colunas transformadas no mesmo formato que os dados em bruto. Em particular, os valores de s_integerized são [0, 1, 0] - esses valores dependem de como as palavras hello e world foram mapeadas para inteiros, o que é determinístico. Para a coluna x_centered , x_centered a média para que os valores da coluna x , que eram [1.0, 2.0, 3.0] , se tornassem [-1.0, 0.0, 1.0] . Da mesma forma, o restante das colunas correspondem aos valores esperados.

[{u's_integerized': 0,
  u'x_centered': -1.0,
  u'x_centered_times_y_normalized': -0.0,
  u'y_normalized': 0.0},
 {u's_integerized': 1,
  u'x_centered': 0.0,
  u'x_centered_times_y_normalized': 0.0,
  u'y_normalized': 0.5},
 {u's_integerized': 0,
  u'x_centered': 1.0,
  u'x_centered_times_y_normalized': 1.0,
  u'y_normalized': 1.0}]

Tanto raw_data quanto transformed_data são conjuntos de dados. As próximas duas seções mostram como a implementação do Beam representa conjuntos de dados e como ler e gravar dados no disco. O outro valor de retorno, transform_fn , representa a transformação aplicada aos dados, abordada em detalhes abaixo.

O AnalyzeAndTransformDataset é a composição das duas transformações fundamentais fornecidas pela implementação AnalyzeDataset e TransformDataset . Portanto, os dois snippets de código a seguir são equivalentes:

transformed_data, transform_fn = (
    my_data | tft_beam.AnalyzeAndTransformDataset(preprocessing_fn))
transform_fn = my_data | tft_beam.AnalyzeDataset(preprocessing_fn)
transformed_data = (my_data, transform_fn) | tft_beam.TransformDataset()

transform_fn é uma função pura que representa uma operação que é aplicada a cada linha do conjunto de dados. Em particular, os valores do analisador já são calculados e tratados como constantes. No exemplo, o transform_fn contém como constantes a média da coluna x , o mínimo e o máximo da coluna y e o vocabulário usado para mapear as strings para inteiros.

Um recurso importante de tf.Transform é que transform_fn representa um mapa sobre linhas - é uma função pura aplicada a cada linha separadamente. Todo o cálculo para agregar linhas é feito em AnalyzeDataset . Além disso, o transform_fn é representado como um Graph TensorFlow que pode ser incorporado ao gráfico de veiculação.

AnalyzeAndTransformDataset é fornecido para otimizações neste caso especial. Este é o mesmo padrão usado no scikit-learn , fornecendo os métodos fit , transform e fit_transform .

Formatos e esquema de dados

A implementação do TFT Beam aceita dois formatos de dados de entrada diferentes. O formato "instance dict" (como visto no exemplo acima e em simple_example.py ) é um formato intuitivo e é adequado para pequenos conjuntos de dados, enquanto o formato TFXIO ( Apache Arrow ) fornece desempenho aprimorado e é adequado para grandes conjuntos de dados.

A implementação do Beam informa em qual formato a PCollection de entrada estaria pelos "metadados" que acompanham a PCollection:

(raw_data, raw_data_metadata) | tft.AnalyzeDataset(...)
  • Se raw_data_metadata é um dataset_metadata.DatasetMetadata (veja abaixo, "O formato 'instance dict'"), então raw_data deve estar no formato "instance dict".
  • Se raw_data_metadata for um tfxio.TensorAdapterConfig (consulte a seção "O formato TFXIO" abaixo), espera-se que raw_data esteja no formato TFXIO.

O formato "instance dict"

Nos exemplos de código anteriores, o código que define raw_data_metadata foi omitido. Os metadados contêm o esquema que define o layout dos dados para que sejam lidos e gravados em vários formatos. Até mesmo o formato na memória mostrado na última seção não é autodescritivo e requer o esquema para ser interpretado como tensores.

Aqui está a definição do esquema para os dados de exemplo:

from tensorflow_transform.tf_metadata import dataset_metadata
from tensorflow_transform.tf_metadata import schema_utils

raw_data_metadata = dataset_metadata.DatasetMetadata(
      schema_utils.schema_from_feature_spec({
        's': tf.io.FixedLenFeature([], tf.string),
        'y': tf.io.FixedLenFeature([], tf.float32),
        'x': tf.io.FixedLenFeature([], tf.float32),
    }))

A classe dataset_schema.Schema contém as informações necessárias para analisar os dados de seu formato no disco ou na memória em tensores. Normalmente, é construído chamando schema_utils.schema_from_feature_spec com as chaves de recurso de mapeamento de tf.io.FixedLenFeature para os valores tf.io.FixedLenFeature , tf.io.VarLenFeature e tf.io.SparseFeature . Consulte a documentação de tf.parse_example para obter mais detalhes.

Acima, usamos tf.io.FixedLenFeature para indicar que cada recurso contém um número fixo de valores, neste caso, um único valor escalar. Como tf.Transform instâncias, o Tensor real que representa o recurso terá forma (None,) que a dimensão desconhecida é a dimensão do lote.

O formato TFXIO

Com este formato, espera-se que os dados estejam contidos em um pyarrow.RecordBatch . Para dados tabulares, nossa implementação do Apache Beam aceita Arrow RecordBatch que consistem em colunas dos seguintes tipos:

  • pa.list_(<primitive>) , onde <primitive> é pa.int64() , pa.float32() pa.binary() ou pa.large_binary() .

  • pa.large_list(<primitive>)

O conjunto de dados de entrada de brinquedo que usamos acima, quando representado como um RecordBatch , tem a seguinte aparência:

raw_data = [
    pa.record_batch([
        pa.array([[1], [2], [3]], pa.list_(pa.float32())),
        pa.array([[1], [2], [3]], pa.list_(pa.float32())),
        pa.array([['hello'], ['world'], ['hello']], pa.list_(pa.binary())),
    ], ['x', 'y', 's'])
]

Semelhante ao DatasetMetadata sendo necessário para acompanhar o formato "instance dict", um tfxio.TensorAdapterConfig é necessário para acompanhar os RecordBatch es. Ele consiste no esquema Arrow dos RecordBatch es e TensorRepresentations para determinar exclusivamente como as colunas em RecordBatch es podem ser interpretadas como TensorFlow Tensor (incluindo, mas não se limitando a tf.Tensor, tf.SparseTensor).

TensorRepresentations é um Dict[Text, TensorRepresentation] que estabelece a relação entre um Tensor que preprocessing_fn aceita e colunas nos RecordBatch es. Por exemplo:

tensor_representation = {
    'x': text_format.Parse(
        """dense_tensor { column_name: "col1" shape { dim { size: 2 } } }"""
        schema_pb2.TensorRepresentation())
}

Significa que as inputs['x'] em preprocessing_fn devem ser um tf.Tensor denso, cujos valores vêm de uma coluna de nome 'col1' nos RecordBatch entrada, e sua forma (em lote) deve ser [batch_size, 2] .

TensorRepresentation é um Protobuf definido em TensorFlow Metadata .

Entrada e saída com Apache Beam

Até agora, vimos dados de entrada e saída em listas python (de RecordBatch ou dicionários de instância). Esta é uma simplificação que depende da capacidade do Apache Beam de trabalhar com listas, bem como de sua principal representação de dados, a PCollection .

Uma PCollection é uma representação de dados que faz parte de um pipeline do Beam. Um pipeline do Beam é formado pela aplicação de vários PTransform s, incluindo AnalyzeDataset e TransformDataset , e pela execução do pipeline. Uma PCollection não é criada na memória do binário principal, mas sim distribuída entre os trabalhadores (embora esta seção use o modo de execução na memória).

Fontes pré-enlatadas de PCollection ( TFXIO )

O formato RecordBatch que nossa implementação aceita é um formato comum que outras bibliotecas TFX aceitam. Portanto, o TFX oferece "fontes" convenientes (também conhecidas como TFXIO ) que lêem arquivos de vários formatos em disco e produzem RecordBatch e também podem fornecer TensorAdapterConfig , incluindo TensorRepresentations inferidos.

Esses TFXIO s podem ser encontrados no pacote tfx_bsl ( tfx_bsl.public.tfxio ).

Exemplo: conjunto de dados "Renda do censo"

O exemplo a seguir requer a leitura e gravação de dados em disco e a representação de dados como uma PCollection (não uma lista), consulte: census_example.py . Abaixo, mostramos como baixar os dados e executar este exemplo. O conjunto de dados "Census Income" é fornecido pelo UCI Machine Learning Repository . Este conjunto de dados contém dados categóricos e numéricos.

Os dados estão no formato CSV, aqui estão as duas primeiras linhas:

39, State-gov, 77516, Bachelors, 13, Never-married, Adm-clerical, Not-in-family, White, Male, 2174, 0, 40, United-States, <=50K
50, Self-emp-not-inc, 83311, Bachelors, 13, Married-civ-spouse, Exec-managerial, Husband, White, Male, 0, 0, 13, United-States, <=50K

As colunas do conjunto de dados são categóricas ou numéricas. Este conjunto de dados descreve um problema de classificação: prever a última coluna em que o indivíduo ganha mais ou menos de 50 mil por ano. No entanto, da perspectiva de tf.Transform , esse rótulo é apenas outra coluna categórica.

Usamos um TFXIO pré-enlatado, BeamRecordCsvTFXIO para traduzir as linhas CSV em RecordBatches . TFXIO requer duas informações importantes:

  • um esquema de metadados do TensorFlow que contém informações de tipo e forma sobre cada coluna CSV. TensorRepresentation s são uma parte opcional do Schema; se não forem fornecidos (que é o caso neste exemplo), eles serão inferidos a partir das informações de tipo e forma. É possível obter o esquema usando uma função auxiliar que fornecemos para traduzir as especificações de análise TF (mostradas neste exemplo) ou executando a validação de dados do TensorFlow .

  • uma lista de nomes de colunas, na ordem em que aparecem no arquivo CSV. Observe que esses nomes devem corresponder aos nomes dos recursos no Esquema.

Neste exemplo, permitimos que o recurso education-num esteja ausente. Isso significa que ele é representado como tf.io.VarLenFeature no feature_spec e como tf.SparseTensor no preprocessing_fn . Outros recursos se tornarão tf.Tensor de mesmo nome no preprocessing_fn .

csv_tfxio = tfxio.BeamRecordCsvTFXIO(
    physical_format='text', column_names=ordered_columns, schema=SCHEMA)

record_batches = (
    p
    | 'ReadTrainData' >> textio.ReadFromText(train_data_file)
    | ...  # fix up csv lines
    | 'ToRecordBatches' >> csv_tfxio.BeamSource())

tensor_adapter_config = csv_tfxio.TensorAdapterConfig()

Observe que tivemos que fazer alguns ajustes adicionais depois que as linhas CSV foram lidas. Caso contrário, poderíamos contar com o CsvTFXIO para lidar com a leitura dos arquivos e a tradução para os RecordBatch :

csv_tfxio = tfxio.CsvTFXIO(train_data_file, column_name=ordered_columns,
                           schema=SCHEMA)
record_batches = p | 'TFXIORead' >> csv_tfxio.BeamSource()
tensor_adapter_config = csv_tfxio.TensorAdapterConfig()

O pré-processamento é semelhante ao exemplo anterior, exceto que a função de pré-processamento é gerada programaticamente em vez de especificar manualmente cada coluna. Na função de pré-processamento abaixo, NUMERICAL_COLUMNS e CATEGORICAL_COLUMNS são listas que contêm os nomes das colunas numéricas e categóricas:

def preprocessing_fn(inputs):
  """Preprocess input columns into transformed columns."""
  # Since we are modifying some features and leaving others unchanged, we
  # start by setting `outputs` to a copy of `inputs.
  outputs = inputs.copy()

  # Scale numeric columns to have range [0, 1].
  for key in NUMERIC_FEATURE_KEYS:
    outputs[key] = tft.scale_to_0_1(outputs[key])

  for key in OPTIONAL_NUMERIC_FEATURE_KEYS:
    # This is a SparseTensor because it is optional. Here we fill in a default
    # value when it is missing.
      sparse = tf.sparse.SparseTensor(outputs[key].indices, outputs[key].values,
                                      [outputs[key].dense_shape[0], 1])
      dense = tf.sparse.to_dense(sp_input=sparse, default_value=0.)
    # Reshaping from a batch of vectors of size 1 to a batch to scalars.
    dense = tf.squeeze(dense, axis=1)
    outputs[key] = tft.scale_to_0_1(dense)

  # For all categorical columns except the label column, we generate a
  # vocabulary but do not modify the feature.  This vocabulary is instead
  # used in the trainer, by means of a feature column, to convert the feature
  # from a string to an integer id.
  for key in CATEGORICAL_FEATURE_KEYS:
    tft.vocabulary(inputs[key], vocab_filename=key)

  # For the label column we provide the mapping from string to index.
  initializer = tf.lookup.KeyValueTensorInitializer(
      keys=['>50K', '<=50K'],
      values=tf.cast(tf.range(2), tf.int64),
      key_dtype=tf.string,
      value_dtype=tf.int64)
  table = tf.lookup.StaticHashTable(initializer, default_value=-1)

  outputs[LABEL_KEY] = table.lookup(outputs[LABEL_KEY])

  return outputs

Uma diferença do exemplo anterior é que a coluna do rótulo especifica manualmente o mapeamento da string para um índice. Portanto, '>50' é mapeado para 0 e '<=50K' é mapeado para 1 pois é útil saber qual índice no modelo treinado corresponde a qual rótulo.

A variável record_batches representa uma PCollection de pyarrow.RecordBatch es. O tensor_adapter_config é fornecido por csv_tfxio , que é inferido do SCHEMA (e, finalmente, neste exemplo, das especificações de análise TF).

O estágio final é gravar os dados transformados no disco e tem uma forma semelhante à leitura dos dados brutos. O esquema usado para fazer isso é parte da saída de AnalyzeAndTransformDataset que infere um esquema para os dados de saída. O código para gravar no disco é mostrado abaixo. O esquema é uma parte dos metadados, mas usa os dois de forma intercambiável na API tf.Transform (ou seja, passa os metadados para o ExampleProtoCoder ). Esteja ciente de que isso grava em um formato diferente. Em vez de textio.WriteToText , use o suporte TFRecord do TFRecord para o formato TFRecord e use um codificador para codificar os dados como protos de Example . Este é o melhor formato para usar no treinamento, conforme mostrado na próxima seção. transformed_eval_data_base fornece o nome de arquivo base para os shards individuais que são gravados.

transformed_data | "WriteTrainData" >> tfrecordio.WriteToTFRecord(
    transformed_eval_data_base,
    coder=tft.coders.ExampleProtoCoder(transformed_metadata))

Além dos dados de treinamento, transform_fn também é escrito com os metadados:

_ = (
    transform_fn
    | 'WriteTransformFn' >> tft_beam.WriteTransformFn(working_dir))
transformed_metadata | 'WriteMetadata' >> tft_beam.WriteMetadata(
    transformed_metadata_file, pipeline=p)

Execute todo o pipeline do Beam com p.run().wait_until_finish() . Até este ponto, o pipeline do Beam representa um cálculo diferido e distribuído. Ele fornece instruções sobre o que será feito, mas as instruções não foram executadas. Esta chamada final executa o pipeline especificado.

Baixe o conjunto de dados do censo

Baixe o conjunto de dados do censo usando os seguintes comandos de shell:

  wget https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.data
  wget https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.test

Ao executar o script census_example.py , passe o diretório que contém esses dados como primeiro argumento. O script cria um subdiretório temporário para adicionar os dados pré-processados.

Integre com o treinamento TensorFlow

A seção final de census_example.py mostra como os dados pré-processados ​​são usados ​​para treinar um modelo. Consulte a documentação dos Estimadores para obter detalhes. O primeiro passo é construir um Estimator que requer uma descrição das colunas pré-processadas. Cada coluna numérica é descrita como um real_valued_column que é um invólucro em torno de um vetor denso com um tamanho fixo ( 1 neste exemplo). Cada coluna categórica é mapeada de string para inteiros e, em seguida, é passada para a coluna do indicator_column . tft.TFTransformOutput é usado para localizar o caminho do arquivo de vocabulário para cada característica categórica.

real_valued_columns = [feature_column.real_valued_column(key)
                       for key in NUMERIC_FEATURE_KEYS]

one_hot_columns = [
    tf.feature_column.indicator_column(
        tf.feature_column.categorical_column_with_vocabulary_file(
            key=key,
            vocabulary_file=tf_transform_output.vocabulary_file_by_name(
                vocab_filename=key)))
    for key in CATEGORICAL_FEATURE_KEYS]

estimator = tf.estimator.LinearClassifier(real_valued_columns + one_hot_columns)

A próxima etapa é criar um construtor para gerar a função de entrada para treinamento e avaliação. O difere do treinamento usado por tf.Learn pois uma especificação de recurso não é necessária para analisar os dados transformados. Em vez disso, use os metadados para os dados transformados para gerar uma especificação de recurso.

def _make_training_input_fn(tf_transform_output, transformed_examples,
                            batch_size):
  ...
  def input_fn():
    """Input function for training and eval."""
    dataset = tf.data.experimental.make_batched_features_dataset(
        ..., tf_transform_output.transformed_feature_spec(), ...)

    transformed_features = tf.compat.v1.data.make_one_shot_iterator(
        dataset).get_next()
    ...

  return input_fn

O código restante é o mesmo que usar a classe Estimator . O exemplo também contém código para exportar o modelo no formato SavedModel . O modelo exportado pode ser usado pelo Tensorflow Serving ou pelo Cloud ML Engine .