Ayuda a proteger la Gran Barrera de Coral con TensorFlow en Kaggle Únete Challenge

El componente de canalización ExampleGen TFX

El componente ExampleGen TFX Pipeline ingiere datos en las tuberías TFX. Consume archivos / servicios externos para generar ejemplos que serán leídos por otros componentes TFX. También proporciona una partición coherente y configurable, y baraja el conjunto de datos para las mejores prácticas de AA.

  • Consume: datos de fuentes de datos externas como CSV, TFRecord , Avro, Parquet y BigQuery.
  • Emite: registros tf.Example , registros tf.SequenceExample o formato proto, según el formato de carga útil.

ExampleGen y otros componentes

ExampleGen proporciona datos a los componentes que utilizan la biblioteca de validación de datos de TensorFlow , como SchemaGen , StatisticsGen y Example Validator . También proporciona datos a Transform , que hace uso de la biblioteca TensorFlow Transform y, en última instancia, a los objetivos de implementación durante la inferencia.

Fuentes y formatos de datos

Actualmente, una instalación estándar de TFX incluye componentes completos de ExampleGen para estas fuentes y formatos de datos:

También hay disponibles ejecutores personalizados que permiten el desarrollo de componentes ExampleGen para estas fuentes y formatos de datos:

Vea los ejemplos de uso en el código fuente y esta discusión para obtener más información sobre cómo usar y desarrollar ejecutores personalizados.

Además, estos formatos y fuentes de datos están disponibles como ejemplos de componentes personalizados :

Ingesta de formatos de datos que son compatibles con Apache Beam

Apache Beam admite la ingesta de datos de una amplia gama de fuentes y formatos de datos ( consulte a continuación ). Estas capacidades se pueden utilizar para crear componentes de ExampleGen personalizados para TFX, lo que se demuestra con algunos componentes de ExampleGen existentes ( ver más abajo ).

Cómo utilizar un componente ExampleGen

Para las fuentes de datos compatibles (actualmente, archivos CSV, archivos TFRecord con formato tf.Example , tf.SequenceExample y proto, y resultados de consultas de BigQuery), el componente de canalización ExampleGen se puede usar directamente en la implementación y requiere poca personalización. Por ejemplo:

example_gen = CsvExampleGen(input_base='data_root')

o como a continuación para importar TFRecord externo con tf.Example directamente:

example_gen = ImportExampleGen(input_base=path_to_tfrecord_dir)

Span, versión y división

Un Span es una agrupación de ejemplos de formación. Si sus datos se conservan en un sistema de archivos, cada tramo puede almacenarse en un directorio separado. La semántica de un Span no está codificada en TFX; un intervalo puede corresponder a un día de datos, una hora de datos o cualquier otro grupo que sea significativo para su tarea.

Cada tramo puede contener múltiples versiones de datos. Para dar un ejemplo, si elimina algunos ejemplos de un Span para limpiar datos de baja calidad, esto podría resultar en una nueva Versión de ese Span. De forma predeterminada, los componentes TFX operan en la última versión dentro de un intervalo.

Cada versión dentro de un intervalo se puede subdividir en múltiples divisiones. El caso de uso más común para dividir un Span es dividirlo en datos de entrenamiento y de evaluación.

Tramos y divisiones

División de entrada / salida personalizada

Para personalizar la relación de división de tren / evaluación que dará como resultado ExampleGen, establezca output_config para el componente ExampleGen. Por ejemplo:

# Input has a single split 'input_dir/*'.
# Output 2 splits: train:eval=3:1.
output = proto.Output(
             split_config=example_gen_pb2.SplitConfig(splits=[
                 proto.SplitConfig.Split(name='train', hash_buckets=3),
                 proto.SplitConfig.Split(name='eval', hash_buckets=1)
             ]))
example_gen = CsvExampleGen(input_base=input_dir, output_config=output)

Observe cómo se configuraron los hash_buckets en este ejemplo.

Para una fuente de entrada que ya se ha dividido, configure input_config para el componente ExampleGen:


# Input train split is 'input_dir/train/*', eval split is 'input_dir/eval/*'.
# Output splits are generated one-to-one mapping from input splits.
input = proto.Input(splits=[
                example_gen_pb2.Input.Split(name='train', pattern='train/*'),
                example_gen_pb2.Input.Split(name='eval', pattern='eval/*')
            ])
example_gen = CsvExampleGen(input_base=input_dir, input_config=input)

Para el gen de ejemplo basado en archivos (por ejemplo, CsvExampleGen e ImportExampleGen), el pattern es un patrón de archivo relativo global que se asigna a los archivos de entrada con el directorio raíz proporcionado por la ruta base de entrada. Para la generación de ejemplos basada en consultas (p. Ej., BigQueryExampleGen, PrestoExampleGen), el pattern es una consulta SQL.

De forma predeterminada, todo el directorio base de entrada se trata como una sola división de entrada, y la división de salida de entrenamiento y evaluación se genera con una proporción de 2: 1.

Consulte proto / example_gen.proto para ver la configuración de división de entrada y salida de ExampleGen. Y consulte la guía de componentes descendentes para utilizar las divisiones personalizadas descendentes.

Método de división

Cuando se usa el método de división hash_buckets , en lugar de todo el registro, se puede usar una función para dividir los ejemplos. Si una función está presente, ExampleGen usará una huella digital de esa función como clave de partición.

Esta característica se puede utilizar para mantener una división estable con ciertas propiedades de los ejemplos: por ejemplo, un usuario siempre se colocará en la misma división si se seleccionó "user_id" como el nombre de la característica de la partición.

La interpretación de lo que significa una "característica" y cómo hacer coincidir una "característica" con el nombre especificado depende de la implementación de ExampleGen y del tipo de ejemplos.

Para implementaciones de ExampleGen listas para usar:

  • Si genera tf.Example, entonces una "característica" significa una entrada en tf.Example.features.feature.
  • Si genera tf.SequenceExample, entonces una "característica" significa una entrada en tf.SequenceExample.context.feature.
  • Solo se admiten las funciones int64 y bytes.

En los siguientes casos, ExampleGen arroja errores de tiempo de ejecución:

  • El nombre de la característica especificada no existe en el ejemplo.
  • Característica vacía: tf.train.Feature() .
  • Tipos de características no admitidas, por ejemplo, características flotantes.

Para generar la división train / eval según una característica de los ejemplos, configure output_config para el componente ExampleGen. Por ejemplo:

# Input has a single split 'input_dir/*'.
# Output 2 splits based on 'user_id' features: train:eval=3:1.
output = proto.Output(
             split_config=proto.SplitConfig(splits=[
                 proto.SplitConfig.Split(name='train', hash_buckets=3),
                 proto.SplitConfig.Split(name='eval', hash_buckets=1)
             ],
             partition_feature_name='user_id'))
example_gen = CsvExampleGen(input_base=input_dir, output_config=output)

Observe cómo se configuró el nombre de función de la partition_feature_name en este ejemplo.

Lapso

El intervalo se puede recuperar utilizando la especificación '{SPAN}' en el patrón global de entrada :

  • Esta especificación coincide con los dígitos y asigna los datos a los números de SPAN relevantes. Por ejemplo, 'data_ {SPAN} - *. Tfrecord' recopilará archivos como 'data_12-a.tfrecord', 'date_12-b.tfrecord'.
  • Opcionalmente, esta especificación se puede especificar con el ancho de los enteros cuando se mapea. Por ejemplo, 'data_ {SPAN: 2} .file' se asigna a archivos como 'data_02.file' y 'data_27.file' (como entradas para Span-2 y Span-27 respectivamente), pero no se asigna a 'data_1. file 'ni' data_123.file '.
  • Cuando falta la especificación SPAN, se asume que siempre es Span '0'.
  • Si se especifica SPAN, la canalización procesará el último tramo y almacenará el número de tramo en metadatos.

Por ejemplo, supongamos que hay datos de entrada:

  • '/ tmp / span-1 / train / data'
  • '/ tmp / span-1 / eval / data'
  • '/ tmp / span-2 / train / data'
  • '/ tmp / span-2 / eval / data'

y la configuración de entrada se muestra a continuación:

splits {
  name: 'train'
  pattern: 'span-{SPAN}/train/*'
}
splits {
  name: 'eval'
  pattern: 'span-{SPAN}/eval/*'
}

al activar la canalización, procesará:

  • '/ tmp / span-2 / train / data' como división de tren
  • '/ tmp / span-2 / eval / data' como eval split

con el número de tramo como '2'. Si más adelante '/ tmp / span-3 / ...' están listos, simplemente active la tubería nuevamente y tomará el span '3' para su procesamiento. A continuación se muestra el ejemplo de código para usar la especificación de intervalo:

input = proto.Input(splits=[
                proto.Input.Split(name='train',
                                            pattern='span-{SPAN}/train/*'),
                proto.Input.Split(name='eval',
                                            pattern='span-{SPAN}/eval/*')
            ])
example_gen = CsvExampleGen(input_base='/tmp', input_config=input)

La recuperación de un determinado intervalo se puede realizar con RangeConfig, que se detalla a continuación.

Fecha

Si su fuente de datos está organizada en el sistema de archivos por fecha, TFX admite el mapeo de fechas directamente para abarcar números. Hay tres especificaciones para representar la asignación de fechas a intervalos: {AAAA}, {MM} y {DD}:

  • Las tres especificaciones deben estar presentes en el patrón global de entrada si se especifica alguna:
  • Se puede especificar exclusivamente la especificación {SPAN} o este conjunto de especificaciones de fecha.
  • Se calcula una fecha de calendario con el año desde AAAA, el mes desde MM y el día del mes desde DD, luego el número de intervalo se calcula como el número de días desde la época Unix (es decir, 1970-01-01). Por ejemplo, 'log- {YYYY} {MM} {DD} .data' coincide con un archivo 'log-19700101.data' y lo consume como entrada para Span-0, y 'log-20170101.data' como entrada para Span-17167.
  • Si se especifica este conjunto de especificaciones de fecha, la canalización procesará la última fecha más reciente y almacenará el número de intervalo correspondiente en los metadatos.

Por ejemplo, supongamos que hay datos de entrada organizados por fecha de calendario:

  • '/ tmp / 1970-01-02 / tren / datos'
  • '/ tmp / 1970-01-02 / eval / data'
  • '/ tmp / 1970-01-03 / tren / datos'
  • '/ tmp / 1970-01-03 / eval / data'

y la configuración de entrada se muestra a continuación:

splits {
  name: 'train'
  pattern: '{YYYY}-{MM}-{DD}/train/*'
}
splits {
  name: 'eval'
  pattern: '{YYYY}-{MM}-{DD}/eval/*'
}

al activar la canalización, procesará:

  • '/ tmp / 1970-01-03 / train / data' como tren dividido
  • '/ tmp / 1970-01-03 / eval / data' como división de evaluación

con el número de tramo como '2'. Si más tarde '/ tmp / 1970-01-04 / ...' están listos, simplemente active la tubería nuevamente y tomará el tramo '3' para su procesamiento. A continuación se muestra el ejemplo de código para usar la especificación de fecha:

input = proto.Input(splits=[
                proto.Input.Split(name='train',
                                            pattern='{YYYY}-{MM}-{DD}/train/*'),
                proto.Input.Split(name='eval',
                                            pattern='{YYYY}-{MM}-{DD}/eval/*')
            ])
example_gen = CsvExampleGen(input_base='/tmp', input_config=input)

Versión

La versión se puede recuperar usando la especificación '{VERSION}' en el patrón global de entrada :

  • Esta especificación coincide con los dígitos y asigna los datos a los números de VERSIÓN relevantes bajo SPAN. Tenga en cuenta que la especificación de versión se puede utilizar en combinación con la especificación de período o fecha.
  • Esta especificación también se puede especificar opcionalmente con el ancho de la misma manera que la especificación SPAN. por ejemplo, 'intervalo- {SPAN} / versión- {VERSIÓN: 4} / datos- *'.
  • Cuando falta la especificación VERSION, la versión se establece en Ninguna.
  • Si se especifican SPAN y VERSION, la canalización procesará la última versión para el último intervalo y almacenará el número de versión en metadatos.
  • Si se especifica VERSION, pero no SPAN (o especificación de fecha), se generará un error.

Por ejemplo, supongamos que hay datos de entrada:

  • '/ tmp / span-1 / ver-1 / train / data'
  • '/ tmp / span-1 / ver-1 / eval / data'
  • '/ tmp / span-2 / ver-1 / train / data'
  • '/ tmp / span-2 / ver-1 / eval / data'
  • '/ tmp / span-2 / ver-2 / train / data'
  • '/ tmp / span-2 / ver-2 / eval / data'

y la configuración de entrada se muestra a continuación:

splits {
  name: 'train'
  pattern: 'span-{SPAN}/ver-{VERSION}/train/*'
}
splits {
  name: 'eval'
  pattern: 'span-{SPAN}/ver-{VERSION}/eval/*'
}

al activar la canalización, procesará:

  • '/ tmp / span-2 / ver-2 / train / data' como división de tren
  • '/ tmp / span-2 / ver-2 / eval / data' como eval split

con el número de tramo como '2' y el número de versión como '2'. Si más adelante '/ tmp / span-2 / ver-3 / ...' están listos, simplemente active la tubería nuevamente y tomará el span '2' y la versión '3' para su procesamiento. A continuación se muestra el ejemplo de código para usar las especificaciones de la versión:

input = proto.Input(splits=[
                proto.Input.Split(name='train',
                                            pattern='span-{SPAN}/ver-{VERSION}/train/*'),
                proto.Input.Split(name='eval',
                                            pattern='span-{SPAN}/ver-{VERSION}/eval/*')
            ])
example_gen = CsvExampleGen(input_base='/tmp', input_config=input)

Configuración de rango

TFX admite la recuperación y el procesamiento de un intervalo específico en ExampleGen basado en archivos usando la configuración de rango, una configuración abstracta que se usa para describir rangos para diferentes entidades TFX. Para recuperar un intervalo específico, configure range_config para un componente ExampleGen basado en archivos. Por ejemplo, supongamos que hay datos de entrada:

  • '/ tmp / span-01 / train / data'
  • '/ tmp / span-01 / eval / data'
  • '/ tmp / span-02 / train / data'
  • '/ tmp / span-02 / eval / data'

Para recuperar y procesar datos específicamente con el intervalo '1', especificamos una configuración de rango además de la configuración de entrada. Tenga en cuenta que ExampleGen solo admite rangos estáticos de un solo tramo (para especificar el procesamiento de tramos individuales específicos). Por lo tanto, para StaticRange, start_span_number debe ser igual a end_span_number. Utilizando el intervalo proporcionado y la información del ancho del intervalo (si se proporciona) para el relleno con ceros, ExampleGen reemplazará la especificación SPAN en los patrones de división proporcionados con el número de intervalo deseado. A continuación se muestra un ejemplo de uso:

# In cases where files have zero-padding, the width modifier in SPAN spec is
# required so TFX can correctly substitute spec with zero-padded span number.
input = proto.Input(splits=[
                proto.Input.Split(name='train',
                                            pattern='span-{SPAN:2}/train/*'),
                proto.Input.Split(name='eval',
                                            pattern='span-{SPAN:2}/eval/*')
            ])
# Specify the span number to be processed here using StaticRange.
range = proto.RangeConfig(
                static_range=proto.StaticRange(
                        start_span_number=1, end_span_number=1)
            )

# After substitution, the train and eval split patterns will be
# 'input_dir/span-01/train/*' and 'input_dir/span-01/eval/*', respectively.
example_gen = CsvExampleGen(input_base=input_dir, input_config=input,
                            range_config=range)

La configuración de rango también se puede usar para procesar fechas específicas, si se usa la especificación de fecha en lugar de la especificación de SPAN. Por ejemplo, supongamos que hay datos de entrada organizados por fecha de calendario:

  • '/ tmp / 1970-01-02 / tren / datos'
  • '/ tmp / 1970-01-02 / eval / data'
  • '/ tmp / 1970-01-03 / tren / datos'
  • '/ tmp / 1970-01-03 / eval / data'

Para recuperar y procesar datos específicamente el 2 de enero de 1970, hacemos lo siguiente:

from  tfx.components.example_gen import utils

input = proto.Input(splits=[
                proto.Input.Split(name='train',
                                            pattern='{YYYY}-{MM}-{DD}/train/*'),
                proto.Input.Split(name='eval',
                                            pattern='{YYYY}-{MM}-{DD}/eval/*')
            ])
# Specify date to be converted to span number to be processed using StaticRange.
span = utils.date_to_span_number(1970, 1, 2)
range = proto.RangeConfig(
                static_range=range_config_pb2.StaticRange(
                        start_span_number=span, end_span_number=span)
            )

# After substitution, the train and eval split patterns will be
# 'input_dir/1970-01-02/train/*' and 'input_dir/1970-01-02/eval/*',
# respectively.
example_gen = CsvExampleGen(input_base=input_dir, input_config=input,
                            range_config=range)

ExampleGen personalizado

Si los componentes de ExampleGen disponibles actualmente no se ajustan a sus necesidades, puede crear un ExampleGen personalizado, que le permitirá leer desde diferentes fuentes de datos o en diferentes formatos de datos.

Personalización de ExampleGen basada en archivos (experimental)

Primero, extienda BaseExampleGenExecutor con un Beam PTransform personalizado, que proporciona la conversión de su división de entrada train / eval a ejemplos TF. Por ejemplo, el ejecutor CsvExampleGen proporciona la conversión de una división CSV de entrada a ejemplos TF.

Luego, cree un componente con el ejecutor anterior, como se hizo en el componente CsvExampleGen . Alternativamente, pase un ejecutor personalizado al componente ExampleGen estándar como se muestra a continuación.

from tfx.components.base import executor_spec
from tfx.components.example_gen.csv_example_gen import executor

example_gen = FileBasedExampleGen(
    input_base=os.path.join(base_dir, 'data/simple'),
    custom_executor_spec=executor_spec.ExecutorClassSpec(executor.Executor))

Ahora, también admitimos la lectura de archivos Avro y Parquet con este método .

Formatos de datos adicionales

Apache Beam admite la lectura de varios formatos de datos adicionales . a través de transformaciones de E / S de haz. Puede crear componentes de ExampleGen personalizados aprovechando las transformaciones de E / S de Beam utilizando un patrón similar al ejemplo de Avro

  return (pipeline
          | 'ReadFromAvro' >> beam.io.ReadFromAvro(avro_pattern)
          | 'ToTFExample' >> beam.Map(utils.dict_to_example))

En el momento de escribir estas líneas, los formatos y las fuentes de datos admitidos actualmente para el SDK de Beam Python incluyen:

  • Amazon S3
  • Apache Avro
  • Apache Hadoop
  • Apache Kafka
  • Parquet Apache
  • Google Cloud BigQuery
  • Google Cloud BigTable
  • Google Cloud Datastore
  • Google Cloud Pub / Sub
  • Google Cloud Storage (GCS)
  • MongoDB

Consulte los documentos de Beam para obtener la lista más reciente.

Ejemplo de personalización basada en consultas (experimental)

Primero, amplíe BaseExampleGenExecutor con un Beam PTransform personalizado, que lee desde la fuente de datos externa. Luego, cree un componente simple extendiendo QueryBasedExampleGen.

Esto puede requerir o no configuraciones de conexión adicionales. Por ejemplo, el ejecutor de BigQuery lee con un conector beam.io predeterminado, que abstrae los detalles de configuración de la conexión. El ejecutor de Presto requiere un Beam PTransform personalizado y un protobuf de configuración de conexión personalizado como entrada.

Si se requiere una configuración de conexión para un componente ExampleGen personalizado, cree un nuevo protobuf y páselo a través de custom_config, que ahora es un parámetro de ejecución opcional. A continuación se muestra un ejemplo de cómo utilizar un componente configurado.

from tfx.examples.custom_components.presto_example_gen.proto import presto_config_pb2
from tfx.examples.custom_components.presto_example_gen.presto_component.component import PrestoExampleGen

presto_config = presto_config_pb2.PrestoConnConfig(host='localhost', port=8080)
example_gen = PrestoExampleGen(presto_config, query='SELECT * FROM chicago_taxi_trips')

Componentes downstream de ExampleGen

La configuración dividida personalizada es compatible con los componentes posteriores.

EstadísticaGen

El comportamiento predeterminado es realizar la generación de estadísticas para todas las divisiones.

Para excluir cualquier división, configure exclude_splits para el componente StatisticsGen. Por ejemplo:

# Exclude the 'eval' split.
statistics_gen = StatisticsGen(
             examples=example_gen.outputs['examples'],
             exclude_splits=['eval'])

SchemaGen

El comportamiento predeterminado es generar un esquema basado en todas las divisiones.

Para excluir cualquier división, establezca exclude_splits para el componente SchemaGen. Por ejemplo:

# Exclude the 'eval' split.
schema_gen = SchemaGen(
             statistics=statistics_gen.outputs['statistics'],
             exclude_splits=['eval'])

ExampleValidator

El comportamiento predeterminado es validar las estadísticas de todas las divisiones en los ejemplos de entrada con un esquema.

Para excluir cualquier división, establezca exclude_splits para el componente ExampleValidator. Por ejemplo:

# Exclude the 'eval' split.
example_validator = ExampleValidator(
             statistics=statistics_gen.outputs['statistics'],
             schema=schema_gen.outputs['schema'],
             exclude_splits=['eval'])

Transformar

El comportamiento predeterminado es analizar y producir los metadatos de la división 'tren' y transformar todas las divisiones.

Para especificar las divisiones de análisis y las divisiones de transformación, configure splits_config para el componente Transformar. Por ejemplo:

# Analyze the 'train' split and transform all splits.
transform = Transform(
      examples=example_gen.outputs['examples'],
      schema=schema_gen.outputs['schema'],
      module_file=_taxi_module_file,
      splits_config=proto.SplitsConfig(analyze=['train'],
                                               transform=['train', 'eval']))

Entrenador y sintonizador

El comportamiento predeterminado es entrenar en la división 'train' y evaluar en la división 'eval'.

Para especificar las divisiones del tren y evaluar las divisiones, configure train_args y eval_args para el componente Trainer. Por ejemplo:

# Train on the 'train' split and evaluate on the 'eval' split.
Trainer = Trainer(
      module_file=_taxi_module_file,
      examples=transform.outputs['transformed_examples'],
      schema=schema_gen.outputs['schema'],
      transform_graph=transform.outputs['transform_graph'],
      train_args=proto.TrainArgs(splits=['train'], num_steps=10000),
      eval_args=proto.EvalArgs(splits=['eval'], num_steps=5000))

Evaluador

El comportamiento predeterminado es proporcionar métricas calculadas en la división 'eval'.

Para calcular una estadística de evaluación en divisiones personalizadas, configure example_splits para el componente Evaluator. Por ejemplo:

# Compute metrics on the 'eval1' split and the 'eval2' split.
Trainer = Evaluator(
      examples=example_gen.outputs['examples'],
      model=trainer.outputs['model'],
      example_splits=['eval1', 'eval2'])

Hay más detalles disponibles en la referencia de la API CsvExampleGen , la referencia de la API FileBasedExampleGen y la referencia de la API ImportExampleGen .