¡Confirme su asistencia a su evento local de TensorFlow Everywhere hoy!
Se usó la API de Cloud Translation para traducir esta página.
Switch to English

Comience con TensorFlow Transform

Esta guía presenta los conceptos básicos de tf.Transform y cómo usarlos. Va a:

  • Defina una función de preprocesamiento , una descripción lógica de la canalización que transforma los datos sin procesar en los datos utilizados para entrenar un modelo de aprendizaje automático.
  • Muestre la implementación de Apache Beam utilizada para transformar datos convirtiendo la función de preprocesamiento en una canalización de Beam .
  • Muestre ejemplos de uso adicionales.

Definir una función de preprocesamiento

La función de preprocesamiento es el concepto más importante de tf.Transform . La función de preprocesamiento es una descripción lógica de una transformación del conjunto de datos. La función de preprocesamiento acepta y devuelve un diccionario de tensores, donde un tensor significa Tensor o SparseTensor . Hay dos tipos de funciones que se utilizan para definir la función de preprocesamiento:

  1. Cualquier función que acepte y devuelva tensores. Estos agregan operaciones de TensorFlow al gráfico que transforman los datos sin procesar en datos transformados.
  2. Cualquiera de los analizadores proporcionados por tf.Transform . Los analizadores también aceptan y devuelven tensores, pero a diferencia de las funciones de TensorFlow, no agregan operaciones al gráfico. En cambio, los analizadores hacen que tf.Transform calcule una operación de pase completo fuera de TensorFlow. Usan los valores del tensor de entrada en todo el conjunto de datos para generar un tensor constante que se devuelve como salida. Por ejemplo, tft.min calcula el mínimo de un tensor sobre el conjunto de datos. tf.Transform proporciona un conjunto fijo de analizadores, pero esto se ampliará en versiones futuras.

Ejemplo de función de preprocesamiento

Al combinar analizadores y funciones regulares de TensorFlow, los usuarios pueden crear canalizaciones flexibles para transformar datos. La siguiente función de preprocesamiento transforma cada una de las tres características de diferentes maneras y combina dos de las características:

import tensorflow as tf
import tensorflow_transform as tft
import tensorflow_transform.beam as tft_beam

def preprocessing_fn(inputs):
  x = inputs['x']
  y = inputs['y']
  s = inputs['s']
  x_centered = x - tft.mean(x)
  y_normalized = tft.scale_to_0_1(y)
  s_integerized = tft.compute_and_apply_vocabulary(s)
  x_centered_times_y_normalized = x_centered * y_normalized
  return {
      'x_centered': x_centered,
      'y_normalized': y_normalized,
      'x_centered_times_y_normalized': x_centered_times_y_normalized,
      's_integerized': s_integerized
  }

Aquí, x , y y s son Tensor que representan características de entrada. El primer tensor de nuevo que se crea, x_centered , se construye mediante la aplicación de tft.mean a x y restando esto desde x . tft.mean(x) devuelve un tensor que representa la media del tensor x . x_centered es el tensor x con la media restada.

El segundo tensor nuevo, y_normalized , se crea de manera similar pero usando el método de conveniencia tft.scale_to_0_1 . Este método hace algo similar a calcular x_centered , es decir, calcular un máximo y un mínimo y usarlos para escalar y .

El tensor s_integerized muestra un ejemplo de manipulación de cadenas. En este caso, tomamos una cadena y la asignamos a un número entero. Utiliza la función de convenienciatft.compute_and_apply_vocabulary . Esta función usa un analizador para calcular los valores únicos tomados por las cadenas de entrada y luego usa las operaciones de TensorFlow para convertir las cadenas de entrada en índices en la tabla de valores únicos.

La columna final muestra que es posible usar operaciones de TensorFlow para crear nuevas funciones combinando tensores.

La función de preprocesamiento define una canalización de operaciones en un conjunto de datos. Para aplicar la canalización, confiamos en una implementación concreta de la API tf.Transform . La implementación de Apache Beam proporciona PTransform que aplica la función de preprocesamiento de un usuario a los datos. El flujo de trabajo típico de un usuario de tf.Transform construirá una función de preprocesamiento y luego la incorporará en una canalización de Beam más grande, creando los datos para el entrenamiento.

Por lotes

El procesamiento por lotes es una parte importante de TensorFlow. Dado que uno de los objetivos de tf.Transform es proporcionar un gráfico de TensorFlow para el preprocesamiento que se puede incorporar al gráfico de servicio (y, opcionalmente, al gráfico de entrenamiento), el procesamiento por lotes también es un concepto importante en tf.Transform .

Aunque no es obvio en el ejemplo anterior, la función de preprocesamiento definida por el usuario pasa tensores que representan lotes y no instancias individuales, como sucede durante el entrenamiento y la entrega con TensorFlow. Por otro lado, los analizadores realizan un cálculo sobre todo el conjunto de datos que devuelve un valor único y no un lote de valores. x es un Tensor con una forma de (batch_size,) , mientras que tft.mean(x) es un Tensor con una forma de () . La resta x - tft.mean(x) difunde donde el valor de tft.mean(x) se resta de cada elemento del lote representado por x .

Implementación de Apache Beam

Si bien la función de preprocesamiento está pensada como una descripción lógica de una canalización de preprocesamiento implementada en múltiples marcos de procesamiento de datos, tf.Transform proporciona una implementación canónica utilizada en Apache Beam. Esta implementación demuestra la funcionalidad requerida de una implementación. No existe una API formal para esta funcionalidad, por lo que cada implementación puede usar una API que sea idiomática para su marco de procesamiento de datos particular.

La implementación de Apache Beam proporciona dos PTransform utilizan para procesar datos para una función de preprocesamiento. A continuación se muestra el uso del PTransform AnalyzeAndTransformDataset :

raw_data = [
    {'x': 1, 'y': 1, 's': 'hello'},
    {'x': 2, 'y': 2, 's': 'world'},
    {'x': 3, 'y': 3, 's': 'hello'}
]

raw_data_metadata = ...
transformed_dataset, transform_fn = (
    (raw_data, raw_data_metadata) | tft_beam.AnalyzeAndTransformDataset(
        preprocessing_fn))
transformed_data, transformed_metadata = transformed_dataset

El contenido de datos transformed_data se muestra a continuación y contiene las columnas transformadas en el mismo formato que los datos sin procesar. En particular, los valores de s_integerized son [0, 1, 0] estos valores dependen de cómo se asignaron las palabras hello y world a números enteros, lo cual es determinista. Para la columna x_centered , restamos la media para que los valores de la columna x , que eran [1.0, 2.0, 3.0] , se convirtieran en [-1.0, 0.0, 1.0] . Del mismo modo, el resto de las columnas coincide con sus valores esperados.

[{u's_integerized': 0,
  u'x_centered': -1.0,
  u'x_centered_times_y_normalized': -0.0,
  u'y_normalized': 0.0},
 {u's_integerized': 1,
  u'x_centered': 0.0,
  u'x_centered_times_y_normalized': 0.0,
  u'y_normalized': 0.5},
 {u's_integerized': 0,
  u'x_centered': 1.0,
  u'x_centered_times_y_normalized': 1.0,
  u'y_normalized': 1.0}]

Tanto raw_data como transformed_data son conjuntos de datos. Las siguientes dos secciones muestran cómo la implementación de Beam representa conjuntos de datos y cómo leer y escribir datos en el disco. El otro valor de retorno, transform_fn , representa la transformación aplicada a los datos, que se describe en detalle a continuación.

AnalyzeAndTransformDataset es la composición de las dos transformaciones fundamentales proporcionadas por la implementación AnalyzeDataset y TransformDataset . Entonces, los siguientes dos fragmentos de código son equivalentes:

transformed_data, transform_fn = (
    my_data | tft_beam.AnalyzeAndTransformDataset(preprocessing_fn))
transform_fn = my_data | tft_beam.AnalyzeDataset(preprocessing_fn)
transformed_data = (my_data, transform_fn) | tft_beam.TransformDataset()

transform_fn es una función pura que representa una operación que se aplica a cada fila del conjunto de datos. En particular, los valores del analizador ya están calculados y tratados como constantes. En el ejemplo, transform_fn contiene como constantes la media de la columna x , el mínimo y el máximo de la columna y , y el vocabulario utilizado para asignar las cadenas a números enteros.

Una característica importante de tf.Transform es que transform_fn representa un mapa sobre filas ; es una función pura aplicada a cada fila por separado. Todo el cálculo para agregar filas se realiza en AnalyzeDataset . Además, transform_fn se representa como un Graph TensorFlow que se puede incrustar en el gráfico de servicio.

AnalyzeAndTransformDataset se proporciona para optimizaciones en este caso especial. Este es el mismo patrón utilizado en scikit-learn , que proporciona los métodos fit , transform y fit_transform .

Formatos de datos y esquema

La implementación de TFT Beam acepta dos formatos de datos de entrada diferentes. El formato de "dictado de instancia" (como se ve en el ejemplo anterior y en simple_example.py ) es un formato intuitivo y es adecuado para pequeños conjuntos de datos, mientras que el formato TFXIO ( Apache Arrow ) proporciona un rendimiento mejorado y es adecuado para grandes conjuntos de datos.

La implementación de Beam indica en qué formato estaría la PCollection de entrada mediante los "metadatos" que acompañan a la PCollection:

(raw_data, raw_data_metadata) | tft.AnalyzeDataset(...)
  • Si raw_data_metadata es un dataset_metadata.DatasetMetadata (véase más adelante, "La 'ejemplo dict' formato" sección), a continuación, raw_data se espera que sea en el formato de "instancia dict".
  • Si raw_data_metadata es un tfxio.TensorAdapterConfig (véase más adelante, "El formato TFXIO" sección), a continuación, raw_data se espera que sea en el formato TFXIO.

El formato "dictado de instancia"

En los ejemplos de código anteriores, se omite el código que define raw_data_metadata . Los metadatos contienen el esquema que define el diseño de los datos para que se lean y escriban en varios formatos. Incluso el formato en memoria que se muestra en la última sección no es autodescriptivo y requiere el esquema para ser interpretado como tensores.

Aquí está la definición del esquema para los datos de ejemplo:

from tensorflow_transform.tf_metadata import dataset_metadata
from tensorflow_transform.tf_metadata import schema_utils

raw_data_metadata = dataset_metadata.DatasetMetadata(
      schema_utils.schema_from_feature_spec({
        's': tf.io.FixedLenFeature([], tf.string),
        'y': tf.io.FixedLenFeature([], tf.float32),
        'x': tf.io.FixedLenFeature([], tf.float32),
    }))

La clase dataset_schema.Schema contiene la información necesaria para analizar los datos de su formato en disco o en memoria, en tensores. Por lo general, se construye llamando a schema_utils.schema_from_feature_spec con una asignación de claves de función dict a los tf.io.FixedLenFeature , tf.io.VarLenFeature y tf.io.SparseFeature . Consulte la documentación de tf.parse_example para obtener más detalles.

Arriba usamos tf.io.FixedLenFeature para indicar que cada característica contiene un número fijo de valores, en este caso un único valor escalar. Dado que tf.Transform instancias por lotes, el Tensor real que representa la entidad tendrá forma (None,) donde la dimensión desconocida es la dimensión del lote.

El formato TFXIO

Con este formato, se espera que los datos estén contenidos en un pyarrow.RecordBatch . Para datos tabulares, nuestra implementación de Apache Beam acepta Arrow RecordBatch es que consta de columnas de los siguientes tipos:

  • pa.list_(<primitive>) , donde <primitive> es pa.int64() , pa.float32() pa.binary() o pa.large_binary() .

  • pa.large_list(<primitive>)

El conjunto de datos de entrada de juguete que usamos anteriormente, cuando se representa como un RecordBatch , tiene el siguiente aspecto:

raw_data = [
    pa.record_batch([
        pa.array([[1], [2], [3]], pa.list_(pa.float32())),
        pa.array([[1], [2], [3]], pa.list_(pa.float32())),
        pa.array([['hello'], ['world'], ['hello']], pa.list_(pa.binary())),
    ], ['x', 'y', 's'])
]

Similar a DatasetMetadata que se necesita para acompañar el formato de " tfxio.TensorAdapterConfig instancia", se necesita un tfxio.TensorAdapterConfig para acompañar el RecordBatch es. Consiste en el esquema Arrow de RecordBatch es y TensorRepresentations para determinar de forma única cómo las columnas en RecordBatch es pueden interpretarse como TensorFlow Tensors (incluidos, entre otros, tf.Tensor, tf.SparseTensor).

TensorRepresentations es un Dict[Text, TensorRepresentation] que establece la relación entre un tensor que acepta preprocessing_fn y las columnas de RecordBatch . Por ejemplo:

tensor_representation = {
    'x': text_format.Parse(
        """dense_tensor { column_name: "col1" shape { dim { size: 2 } } }"""
        schema_pb2.TensorRepresentation())
}

Significa que las inputs['x'] en preprocessing_fn deben ser un tf.Tensor denso, cuyos valores provienen de una columna de nombre 'col1' en la entrada RecordBatch es, y su forma (por lotes) debe ser [batch_size, 2] .

TensorRepresentation es un Protobuf definido en TensorFlow Metadata .

Entrada y salida con Apache Beam

Hasta ahora, hemos visto datos de entrada y salida en listas de Python (de RecordBatch es o diccionarios de instancia). Esta es una simplificación que se basa en la capacidad de Apache Beam para trabajar con listas, así como en su principal representación de datos, PCollection .

Una PCollection es una representación de datos que forma parte de una canalización de Beam. Una canalización de Beam se forma aplicando varios PTransform , incluidos AnalyzeDataset y TransformDataset , y ejecutando la canalización. Una PCollection no se crea en la memoria del binario principal, sino que se distribuye entre los trabajadores (aunque esta sección usa el modo de ejecución en memoria).

Pre enlatados- PCollection fuentes ( TFXIO )

El formato RecordBatch que acepta nuestra implementación es un formato común que aceptan otras bibliotecas TFX. Por lo tanto, TFX ofrece "fuentes" convenientes (también TFXIO como TFXIO ) que leen archivos de varios formatos en el disco y producen RecordBatch es y también pueden proporcionar TensorAdapterConfig , incluidas TensorRepresentations inferidas.

Esos TFXIO se pueden encontrar en el paquete tfx_bsl ( tfx_bsl.public.tfxio ).

Ejemplo: conjunto de datos "Ingresos del censo"

El siguiente ejemplo requiere leer y escribir datos en el disco y representar los datos como una PCollection (no como una lista), consulte: census_example.py . A continuación, mostramos cómo descargar los datos y ejecutar este ejemplo. El conjunto de datos "Ingresos del censo" es proporcionado por el Repositorio de aprendizaje automático de la UCI . Este conjunto de datos contiene datos categóricos y numéricos.

Los datos están en formato CSV, aquí están las dos primeras líneas:

39, State-gov, 77516, Bachelors, 13, Never-married, Adm-clerical, Not-in-family, White, Male, 2174, 0, 40, United-States, <=50K
50, Self-emp-not-inc, 83311, Bachelors, 13, Married-civ-spouse, Exec-managerial, Husband, White, Male, 0, 0, 13, United-States, <=50K

Las columnas del conjunto de datos son categóricas o numéricas. Este conjunto de datos describe un problema de clasificación: predecir la última columna donde el individuo gana más o menos de 50.000 por año. Sin embargo, desde la perspectiva de tf.Transform , esta etiqueta es solo otra columna categórica.

Usamos un TFXIO predefinido, BeamRecordCsvTFXIO para traducir las líneas CSV a RecordBatches . TFXIO requiere dos datos importantes:

  • un esquema de metadatos de TensorFlow que contiene información sobre el tipo y la forma de cada columna CSV. TensorRepresentation s son una parte opcional del esquema; si no se proporcionan (que es el caso en este ejemplo), se deducirán de la información de tipo y forma. Se puede obtener el esquema mediante el uso de una función auxiliar que proporcionamos para traducir de las especificaciones de análisis de TF (que se muestran en este ejemplo) o ejecutando TensorFlow Data Validation .

  • una lista de nombres de columna, en el orden en que aparecen en el archivo CSV. Tenga en cuenta que esos nombres deben coincidir con los nombres de las funciones en el esquema.

En este ejemplo permitimos que falte la función de education-num . Esto significa que se representa como tf.io.VarLenFeature en feature_spec, y como tf.SparseTensor en preprocessing_fn . Otras características se convertirán en tf.Tensor s del mismo nombre en preprocessing_fn .

csv_tfxio = tfxio.BeamRecordCsvTFXIO(
    physical_format='text', column_names=ordered_columns, schema=SCHEMA)

record_batches = (
    p
    | 'ReadTrainData' >> textio.ReadFromText(train_data_file)
    | ...  # fix up csv lines
    | 'ToRecordBatches' >> csv_tfxio.BeamSource())

tensor_adapter_config = csv_tfxio.TensorAdapterConfig()

Tenga en cuenta que tuvimos que hacer algunas reparaciones adicionales después de leer las líneas CSV. De lo contrario, podríamos confiar en el CsvTFXIO para manejar tanto la lectura de los archivos como la traducción a RecordBatch es:

csv_tfxio = tfxio.CsvTFXIO(train_data_file, column_name=ordered_columns,
                           schema=SCHEMA)
record_batches = p | 'TFXIORead' >> csv_tfxio.BeamSource()
tensor_adapter_config = csv_tfxio.TensorAdapterConfig()

El preprocesamiento es similar al ejemplo anterior, excepto que la función de preprocesamiento se genera mediante programación en lugar de especificar manualmente cada columna. En la función de preprocesamiento a continuación, NUMERICAL_COLUMNS y CATEGORICAL_COLUMNS son listas que contienen los nombres de las columnas numéricas y categóricas:

def preprocessing_fn(inputs):
  """Preprocess input columns into transformed columns."""
  # Since we are modifying some features and leaving others unchanged, we
  # start by setting `outputs` to a copy of `inputs.
  outputs = inputs.copy()

  # Scale numeric columns to have range [0, 1].
  for key in NUMERIC_FEATURE_KEYS:
    outputs[key] = tft.scale_to_0_1(outputs[key])

  for key in OPTIONAL_NUMERIC_FEATURE_KEYS:
    # This is a SparseTensor because it is optional. Here we fill in a default
    # value when it is missing.
      sparse = tf.sparse.SparseTensor(outputs[key].indices, outputs[key].values,
                                      [outputs[key].dense_shape[0], 1])
      dense = tf.sparse.to_dense(sp_input=sparse, default_value=0.)
    # Reshaping from a batch of vectors of size 1 to a batch to scalars.
    dense = tf.squeeze(dense, axis=1)
    outputs[key] = tft.scale_to_0_1(dense)

  # For all categorical columns except the label column, we generate a
  # vocabulary but do not modify the feature.  This vocabulary is instead
  # used in the trainer, by means of a feature column, to convert the feature
  # from a string to an integer id.
  for key in CATEGORICAL_FEATURE_KEYS:
    tft.vocabulary(inputs[key], vocab_filename=key)

  # For the label column we provide the mapping from string to index.
  initializer = tf.lookup.KeyValueTensorInitializer(
      keys=['>50K', '<=50K'],
      values=tf.cast(tf.range(2), tf.int64),
      key_dtype=tf.string,
      value_dtype=tf.int64)
  table = tf.lookup.StaticHashTable(initializer, default_value=-1)

  outputs[LABEL_KEY] = table.lookup(outputs[LABEL_KEY])

  return outputs

Una diferencia con el ejemplo anterior es que la columna de etiqueta especifica manualmente la asignación de la cadena a un índice. Entonces, '>50' se asigna a 0 y '<=50K' se asigna a 1 porque es útil saber qué índice en el modelo entrenado corresponde a qué etiqueta.

La variable record_batches representa una PCollection de pyarrow.RecordBatch es. El tensor_adapter_config viene dado por csv_tfxio , que se infiere de SCHEMA (y en última instancia, en este ejemplo, de las especificaciones de análisis de TF).

La etapa final es escribir los datos transformados en el disco y tiene una forma similar a la de leer los datos sin procesar. El esquema utilizado para hacer esto es parte de la salida de AnalyzeAndTransformDataset que infiere un esquema para los datos de salida. El código para escribir en el disco se muestra a continuación. El esquema es parte de los metadatos, pero usa los dos indistintamente en la API tf.Transform (es decir, pasa los metadatos al ExampleProtoCoder ). Tenga en cuenta que esto escribe en un formato diferente. En lugar de textio.WriteToText , use el soporte integrado de Beam para el formato TFRecord y use un codificador para codificar los datos como protos de Example . Este es un formato mejor para usar en la capacitación, como se muestra en la siguiente sección. transformed_eval_data_base proporciona el nombre de archivo base para los fragmentos individuales que se escriben.

transformed_data | "WriteTrainData" >> tfrecordio.WriteToTFRecord(
    transformed_eval_data_base,
    coder=tft.coders.ExampleProtoCoder(transformed_metadata))

Además de los datos de entrenamiento, transform_fn también se escribe con los metadatos:

_ = (
    transform_fn
    | 'WriteTransformFn' >> tft_beam.WriteTransformFn(working_dir))
transformed_metadata | 'WriteMetadata' >> tft_beam.WriteMetadata(
    transformed_metadata_file, pipeline=p)

Ejecute toda la canalización de Beam con p.run().wait_until_finish() . Hasta este punto, la canalización de Beam representa un cálculo distribuido diferido. Proporciona instrucciones sobre lo que se hará, pero las instrucciones no se han ejecutado. Esta última llamada ejecuta la canalización especificada.

Descarga el conjunto de datos del censo

Descargue el conjunto de datos del censo con los siguientes comandos de shell:

  wget https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.data
  wget https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.test

Cuando ejecute el script census_example.py , pase el directorio que contiene estos datos como primer argumento. El script crea un subdirectorio temporal para agregar los datos preprocesados.

Integrar con TensorFlow Training

La sección final de census_example.py muestra cómo se utilizan los datos preprocesados ​​para entrenar un modelo. Consulte la documentación de Estimadores para obtener más detalles. El primer paso es construir un Estimator que requiere una descripción de las columnas preprocesadas. Cada columna numérica se describe como una real_valued_column que es una envoltura alrededor de un vector denso con un tamaño fijo ( 1 en este ejemplo). Cada columna categórica se asigna de cadena a números enteros y luego se pasa a indicator_column . tft.TFTransformOutput se utiliza para encontrar la ruta del archivo de vocabulario para cada característica categórica.

real_valued_columns = [feature_column.real_valued_column(key)
                       for key in NUMERIC_FEATURE_KEYS]

one_hot_columns = [
    tf.feature_column.indicator_column(
        tf.feature_column.categorical_column_with_vocabulary_file(
            key=key,
            vocabulary_file=tf_transform_output.vocabulary_file_by_name(
                vocab_filename=key)))
    for key in CATEGORICAL_FEATURE_KEYS]

estimator = tf.estimator.LinearClassifier(real_valued_columns + one_hot_columns)

El siguiente paso es crear un constructor para generar la función de entrada para entrenamiento y evaluación. Se diferencia del entrenamiento utilizado por tf.Learn ya que no se requiere una especificación de característica para analizar los datos transformados. En su lugar, utilice los metadatos de los datos transformados para generar una especificación de función.

def _make_training_input_fn(tf_transform_output, transformed_examples,
                            batch_size):
  ...
  def input_fn():
    """Input function for training and eval."""
    dataset = tf.data.experimental.make_batched_features_dataset(
        ..., tf_transform_output.transformed_feature_spec(), ...)

    transformed_features = tf.compat.v1.data.make_one_shot_iterator(
        dataset).get_next()
    ...

  return input_fn

El código restante es el mismo que usa la clase Estimator . El ejemplo también contiene código para exportar el modelo en formato SavedModel . Tensorflow Serving o Cloud ML Engine pueden usar el modelo exportado.