Construyendo una canalización TFX localmente

TFX facilita la organización de su flujo de trabajo de aprendizaje automático (ML) como una canalización, con el fin de:

  • Automatice su proceso de aprendizaje automático, lo que le permite volver a capacitar, evaluar e implementar su modelo con regularidad.
  • Cree canalizaciones de ML que incluyan un análisis profundo del rendimiento del modelo y la validación de modelos recién entrenados para garantizar el rendimiento y la confiabilidad.
  • Supervise los datos de entrenamiento en busca de anomalías y elimine el sesgo de servicio de entrenamiento
  • Aumente la velocidad de la experimentación ejecutando una canalización con diferentes conjuntos de hiperparámetros.

Un proceso de desarrollo de canalización típico comienza en una máquina local, con análisis de datos y configuración de componentes, antes de implementarse en producción. Esta guía describe dos formas de crear una canalización localmente.

  • Personalice una plantilla de canalización TFX para adaptarse a las necesidades de su flujo de trabajo de aprendizaje automático. Las plantillas de canalización TFX son flujos de trabajo prediseñados que demuestran las mejores prácticas utilizando los componentes estándar TFX.
  • Construya una canalización usando TFX. En este caso de uso, define una canalización sin comenzar desde una plantilla.

Como usted está desarrollando su tubería, se puede ejecutar con LocalDagRunner . Luego, una vez que los componentes de la canalización se hayan definido y probado bien, usaría un orquestador de nivel de producción como Kubeflow o Airflow.

Antes de que empieces

TFX es un paquete de Python, por lo que deberá configurar un entorno de desarrollo de Python, como un entorno virtual o un contenedor Docker. Luego:

pip install tfx

Si usted es nuevo en las tuberías TFX, aprender más acerca de los conceptos básicos para tuberías TFX antes de continuar.

Construye una canalización usando una plantilla

Las plantillas de canalización TFX facilitan la iniciación al desarrollo de canalización al proporcionar un conjunto prediseñado de definiciones de canalización que puede personalizar para su caso de uso.

Las siguientes secciones describen cómo crear una copia de una plantilla y personalizarla para satisfacer sus necesidades.

Cree una copia de la plantilla de canalización

  1. Consulte la lista de plantillas de canalización TFX disponibles:

    tfx template list
    
  2. Seleccione una plantilla de la lista

    tfx template copy --model=template --pipeline_name=pipeline-name \
    --destination_path=destination-path
    

    Reemplazar lo siguiente:

    • template : El nombre de la plantilla que desea copiar.
    • pipeline-name : El nombre de la tubería para crear.
    • destination-path : La ruta para copiar la plantilla en.

    Más información sobre la tfx template copy comando .

  3. Se ha creado una copia de la plantilla de canalización en la ruta que especificó.

Explore la plantilla de canalización

Esta sección proporciona una descripción general del andamio creado por una plantilla.

  1. Explore los directorios y archivos que se copiaron al directorio raíz de su canalización

    • Un directorio de tubería con
      • pipeline.py - define la tubería, y se están utilizando listas que los componentes
      • configs.py - Hold detalles de configuración tales como donde los datos proviene de o que Orchestrator está siendo utilizado
    • Un directorio de datos
      • Esto normalmente contiene un data.csv archivo, que es la fuente por defecto para ExampleGen . Puede cambiar la fuente de datos en configs.py .
    • Un directorio con los modelos de código y modelo de pre-procesamiento implementaciones

    • La plantilla copia los corredores de DAG para el entorno local y Kubeflow.

    • Algunas plantillas también incluyen Python Notebooks para que pueda explorar sus datos y artefactos con Machine Learning MetaData.

  2. Ejecute los siguientes comandos en su directorio de canalización:

    tfx pipeline create --pipeline_path local_runner.py
    
    tfx run create --pipeline_name pipeline_name
    

    El comando crea una corrida de tuberías utilizando LocalDagRunner , que agrega los siguientes directorios a su tubería:

    • Un directorio tfx_metadata que contiene el almacén de metadatos ML utilizado localmente.
    • Un directorio que contiene tfx_pipeline_output salidas de archivos de la tubería.
  3. Abra su tubería de pipeline/configs.py archivo y revisar el contenido. Este script define las opciones de configuración utilizadas por la canalización y las funciones del componente. Aquí es donde debe especificar cosas como la ubicación de la fuente de datos o la cantidad de pasos de entrenamiento en una ejecución.

  4. Abra su tubería de pipeline/pipeline.py archivo y revisar el contenido. Este script crea la canalización TFX. Inicialmente, el gasoducto contiene sólo un ExampleGen componente.

    • Siga las instrucciones de los comentarios TODO en pipeline.py añadir más pasos para la tubería.
  5. Abrir local_runner.py presentar y examinar el contenido. Este script crea una corrida de tuberías y especifica los parámetros de la carrera, como el data_path y preprocessing_fn .

  6. Ha revisado el andamiaje creado por la plantilla y ha creado una carrera de tuberías utilizando LocalDagRunner . A continuación, personalice la plantilla para que se ajuste a sus necesidades.

Personaliza tu canalización

Esta sección proporciona una descripción general de cómo comenzar a personalizar su plantilla.

  1. Diseñe su tubería. El andamiaje que proporciona una plantilla le ayuda a implementar una canalización para datos tabulares utilizando los componentes estándar TFX. Si va a mover un flujo de trabajo ML existente en una tubería, puede que tenga que revisar su código para hacer pleno uso de TFX componentes estándar . También puede ser necesario para crear componentes personalizados que implementan características que son únicas a su flujo de trabajo o que aún no son compatibles con TFX componentes estándar.

  2. Una vez que haya diseñado su canalización, personalice iterativamente la canalización mediante el siguiente proceso. Empezar desde el componente de datos que ingiere en su oleoducto, que suele ser el ExampleGen componente.

    1. Personalice la canalización o un componente para que se adapte a su caso de uso. Estas personalizaciones pueden incluir cambios como:

      • Cambiar los parámetros de la canalización.
      • Agregar componentes a la tubería o eliminarlos.
      • Reemplazo de la fuente de entrada de datos. Esta fuente de datos puede ser un archivo o consultas a servicios como BigQuery.
      • Cambiar la configuración de un componente en la canalización.
      • Cambiar la función de personalización de un componente.
    2. Ejecutar el componente de forma local mediante la local_runner.py guión, u otro corredor DAG apropiada si está utilizando un orquestador diferente. Si la secuencia de comandos falla, depure la falla y vuelva a intentar ejecutar la secuencia de comandos.

    3. Una vez que esta personalización esté funcionando, pase a la siguiente personalización.

  3. Trabajando de forma iterativa, puede personalizar cada paso en el flujo de trabajo de la plantilla para satisfacer sus necesidades.

Construye una canalización personalizada

Utilice las siguientes instrucciones para obtener más información sobre cómo crear una canalización personalizada sin utilizar una plantilla.

  1. Diseñe su tubería. Los componentes estándar de TFX brindan una funcionalidad probada para ayudarlo a implementar un flujo de trabajo de ML completo. Si está moviendo un flujo de trabajo de AA existente a una canalización, es posible que deba revisar su código para hacer un uso completo de los componentes estándar de TFX. También puede ser necesario para crear componentes personalizados que implementan características tales como aumento de datos.

  2. Cree un archivo de secuencia de comandos para definir su canalización con el siguiente ejemplo. En esta guía se hace referencia a este archivo como my_pipeline.py .

    import os
    from typing import Optional, Text, List
    from absl import logging
    from ml_metadata.proto import metadata_store_pb2
    import tfx.v1 as tfx
    
    PIPELINE_NAME = 'my_pipeline'
    PIPELINE_ROOT = os.path.join('.', 'my_pipeline_output')
    METADATA_PATH = os.path.join('.', 'tfx_metadata', PIPELINE_NAME, 'metadata.db')
    ENABLE_CACHE = True
    
    def create_pipeline(
      pipeline_name: Text,
      pipeline_root:Text,
      enable_cache: bool,
      metadata_connection_config: Optional[
        metadata_store_pb2.ConnectionConfig] = None,
      beam_pipeline_args: Optional[List[Text]] = None
    ):
      components = []
    
      return tfx.dsl.Pipeline(
            pipeline_name=pipeline_name,
            pipeline_root=pipeline_root,
            components=components,
            enable_cache=enable_cache,
            metadata_connection_config=metadata_connection_config,
            beam_pipeline_args=beam_pipeline_args, 
        )
    
    def run_pipeline():
      my_pipeline = create_pipeline(
          pipeline_name=PIPELINE_NAME,
          pipeline_root=PIPELINE_ROOT,
          enable_cache=ENABLE_CACHE,
          metadata_connection_config=tfx.orchestration.metadata.sqlite_metadata_connection_config(METADATA_PATH)
          )
    
      tfx.orchestration.LocalDagRunner().run(my_pipeline)
    
    if __name__ == '__main__':
      logging.set_verbosity(logging.INFO)
      run_pipeline()
    

    En los próximos pasos, a definir su oleoducto en create_pipeline y ejecutar la canalización de forma local mediante el corredor local.

    Cree iterativamente su canalización mediante el siguiente proceso.

    1. Personalice la canalización o un componente para que se adapte a su caso de uso. Estas personalizaciones pueden incluir cambios como:

      • Cambiar los parámetros de la canalización.
      • Agregar componentes a la tubería o eliminarlos.
      • Reemplazo de un archivo de entrada de datos.
      • Cambiar la configuración de un componente en la canalización.
      • Cambiar la función de personalización de un componente.
    2. Ejecute el componente localmente usando el corredor local o ejecutando el script directamente. Si la secuencia de comandos falla, depure la falla y vuelva a intentar ejecutar la secuencia de comandos.

    3. Una vez que esta personalización esté funcionando, pase a la siguiente personalización.

    Comience desde el primer nodo en el flujo de trabajo de su canalización, normalmente el primer nodo ingiere datos en su canalización.

  3. Agregue el primer nodo en su flujo de trabajo a su canalización. En este ejemplo, la tubería utiliza el ExampleGen componente estándar para cargar un archivo CSV en un directorio telefónico ./data .

    from tfx.components import CsvExampleGen
    
    DATA_PATH = os.path.join('.', 'data')
    
    def create_pipeline(
      pipeline_name: Text,
      pipeline_root:Text,
      data_path: Text,
      enable_cache: bool,
      metadata_connection_config: Optional[
        metadata_store_pb2.ConnectionConfig] = None,
      beam_pipeline_args: Optional[List[Text]] = None
    ):
      components = []
    
      example_gen = tfx.components.CsvExampleGen(input_base=data_path)
      components.append(example_gen)
    
      return tfx.dsl.Pipeline(
            pipeline_name=pipeline_name,
            pipeline_root=pipeline_root,
            components=components,
            enable_cache=enable_cache,
            metadata_connection_config=metadata_connection_config,
            beam_pipeline_args=beam_pipeline_args, 
        )
    
    def run_pipeline():
      my_pipeline = create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        data_path=DATA_PATH,
        enable_cache=ENABLE_CACHE,
        metadata_connection_config=tfx.orchestration.metadata.sqlite_metadata_connection_config(METADATA_PATH)
        )
    
      tfx.orchestration.LocalDagRunner().run(my_pipeline)
    

    CsvExampleGen crea serializado registros ejemplo utilizando los datos de la CSV en la ruta de datos especificado. Al establecer el CsvExampleGen del componente input_base parámetro con la raíz de datos.

  4. Crear un data de directorio en el mismo directorio que my_pipeline.py . Añadir un pequeño archivo CSV a la data del directorio.

  5. Utilice el siguiente comando para ejecutar su my_pipeline.py guión.

    python my_pipeline.py
    

    El resultado debería ser similar al siguiente:

    INFO:absl:Component CsvExampleGen depends on [].
    INFO:absl:Component CsvExampleGen is scheduled.
    INFO:absl:Component CsvExampleGen is running.
    INFO:absl:Running driver for CsvExampleGen
    INFO:absl:MetadataStore with DB connection initialized
    INFO:absl:Running executor for CsvExampleGen
    INFO:absl:Generating examples.
    INFO:absl:Using 1 process(es) for Local pipeline execution.
    INFO:absl:Processing input csv data ./data/* to TFExample.
    WARNING:root:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be.
    INFO:absl:Examples generated.
    INFO:absl:Running publisher for CsvExampleGen
    INFO:absl:MetadataStore with DB connection initialized
    INFO:absl:Component CsvExampleGen is finished.
    
  6. Continúe agregando componentes de manera iterativa a su canalización.