Construindo um Pipeline TFX localmente

O TFX torna mais fácil orquestrar seu fluxo de trabalho de aprendizado de máquina (ML) como um pipeline, para:

  • Automatize seu processo de ML, o que permite treinar, avaliar e implantar regularmente seu modelo.
  • Crie pipelines de ML que incluem uma análise profunda do desempenho do modelo e validação de modelos recém-treinados para garantir o desempenho e a confiabilidade.
  • Monitore os dados de treinamento em busca de anomalias e elimine a distorção do serviço de treinamento
  • Aumente a velocidade de experimentação executando um pipeline com diferentes conjuntos de hiperparâmetros.

Um processo típico de desenvolvimento de pipeline começa em uma máquina local, com análise de dados e configuração de componentes, antes de ser implantado na produção. Este guia descreve duas maneiras de construir um pipeline localmente.

  • Personalize um modelo de pipeline TFX para atender às necessidades de seu fluxo de trabalho de ML. Os modelos de pipeline TFX são fluxos de trabalho pré-construídos que demonstram as melhores práticas usando os componentes padrão TFX.
  • Construa um pipeline usando TFX. Nesse caso de uso, você define um pipeline sem iniciar a partir de um modelo.

Como você está desenvolvendo seu pipeline, você pode executá-lo com LocalDagRunner . Então, depois que os componentes do pipeline foram bem definidos e testados, você usaria um orquestrador de nível de produção, como Kubeflow ou Airflow.

Antes de você começar

TFX é um pacote Python, então você precisará configurar um ambiente de desenvolvimento Python, como um ambiente virtual ou um contêiner Docker. Então:

pip install tfx

Se você é novo para pipelines TFX, aprender mais sobre os conceitos fundamentais para condutas TFX antes de continuar.

Construir um pipeline usando um modelo

Os modelos de pipeline TFX facilitam o início do desenvolvimento de pipeline, fornecendo um conjunto pré-construído de definições de pipeline que você pode personalizar para seu caso de uso.

As seções a seguir descrevem como criar uma cópia de um modelo e personalizá-lo para atender às suas necessidades.

Crie uma cópia do modelo de pipeline

  1. Veja a lista dos modelos de pipeline TFX disponíveis:

    tfx template list
    
  2. Selecione um modelo da lista

    tfx template copy --model=template --pipeline_name=pipeline-name \
    --destination_path=destination-path
    

    Substitua o seguinte:

    • template : O nome do modelo que você deseja copiar.
    • pipeline-name : O nome do gasoduto para criar.
    • destination-path : O caminho para copiar o modelo para.

    Saiba mais sobre a tfx template copy comando .

  3. Uma cópia do modelo de pipeline foi criada no caminho que você especificou.

Explore o modelo de pipeline

Esta seção fornece uma visão geral da estrutura criada por um modelo.

  1. Explore os diretórios e arquivos que foram copiados para o diretório raiz do pipeline

    • Um diretório de pipeline com
      • pipeline.py - define o gasoduto, e listas de quais componentes estão sendo usados
      • configs.py detalhes de configuração de retenção, tais como onde os dados são provenientes de ou que orquestrador está sendo usado -
    • Um diretório de dados
      • Isso normalmente contém uma data.csv arquivo, que é a fonte padrão para ExampleGen . Você pode alterar a fonte de dados em configs.py .
    • Um diretório de modelos com código e modelo de pré-processamento implementações

    • O modelo copia os executores DAG para o ambiente local e Kubeflow.

    • Alguns modelos também incluem Notebooks Python para que você possa explorar seus dados e artefatos com MetaDados de aprendizado de máquina.

  2. Execute os seguintes comandos no diretório do pipeline:

    tfx pipeline create --pipeline_path local_runner.py
    
    tfx run create --pipeline_name pipeline_name
    

    O comando cria uma corrida gasoduto usando LocalDagRunner , que adiciona os seguintes diretórios para seu pipeline:

    • Um diretório tfx_metadata que contém o armazenamento ML Metadados usado localmente.
    • Um diretório tfx_pipeline_output que contém saídas de arquivo do gasoduto.
  3. Abra o seu pipeline de pipeline/configs.py arquivo e revisar o conteúdo. Este script define as opções de configuração usadas pelo pipeline e as funções do componente. É aqui que você especifica coisas como a localização da fonte de dados ou o número de etapas de treinamento em uma execução.

  4. Abra o seu pipeline de pipeline/pipeline.py arquivo e revisar o conteúdo. Este script cria o pipeline TFX. Inicialmente, o gasoduto contém apenas um ExampleGen componente.

    • Siga as instruções nos comentários TODO em pipeline.py para adicionar mais passos para o pipeline.
  5. Abrir local_runner.py arquivo e revisão do conteúdo. Este script cria uma corrida gasoduto e especifica os parâmetros do funcionamento, tais como a data_path e preprocessing_fn .

  6. Você analisou o andaime criado pelo modelo e criou uma corrida gasoduto usando LocalDagRunner . Em seguida, personalize o modelo para atender aos seus requisitos.

Personalize seu pipeline

Esta seção fornece uma visão geral de como começar a personalizar seu modelo.

  1. Projete seu pipeline. A estrutura que um modelo fornece ajuda a implementar um pipeline para dados tabulares usando os componentes padrão TFX. Se você estiver movendo um fluxo de trabalho ML existente em um gasoduto, você pode precisar de rever o seu código para fazer pleno uso de componentes padrão TFX . Você também pode precisar para criar componentes personalizados que implementam características que são únicas para o seu fluxo de trabalho ou que ainda não são suportados por componentes padrão TFX.

  2. Depois de projetar seu pipeline, personalize iterativamente o pipeline usando o seguinte processo. Iniciar a partir do componente que ingere dados em seu pipeline, que é geralmente o ExampleGen componente.

    1. Personalize o pipeline ou um componente para se adequar ao seu caso de uso. Essas personalizações podem incluir alterações como:

      • Alterar os parâmetros do pipeline.
      • Adicionar componentes ao pipeline ou removê-los.
      • Substituindo a fonte de entrada de dados. Essa fonte de dados pode ser um arquivo ou consultas em serviços como o BigQuery.
      • Alterar a configuração de um componente no pipeline.
      • Alterar a função de personalização de um componente.
    2. Executar o componente localmente usando o local_runner.py script ou outro corredor DAG adequado se você estiver usando um orquestrador diferente. Se o script falhar, depure a falha e tente executar o script novamente.

    3. Quando essa personalização estiver funcionando, vá para a próxima personalização.

  3. Trabalhando iterativamente, você pode personalizar cada etapa do fluxo de trabalho do modelo para atender às suas necessidades.

Construir um pipeline personalizado

Use as instruções a seguir para saber mais sobre como construir um pipeline personalizado sem usar um modelo.

  1. Projete seu pipeline. Os componentes padrão TFX fornecem funcionalidade comprovada para ajudá-lo a implementar um fluxo de trabalho de ML completo. Se você estiver movendo um fluxo de trabalho de ML existente para um pipeline, pode ser necessário revisar seu código para fazer uso total dos componentes padrão do TFX. Você também pode precisar para criar componentes personalizados que implementam funcionalidades, tais como o aumento de dados.

  2. Crie um arquivo de script para definir seu pipeline usando o exemplo a seguir. Este guia refere-se a este ficheiro como my_pipeline.py .

    import os
    from typing import Optional, Text, List
    from absl import logging
    from ml_metadata.proto import metadata_store_pb2
    import tfx.v1 as tfx
    
    PIPELINE_NAME = 'my_pipeline'
    PIPELINE_ROOT = os.path.join('.', 'my_pipeline_output')
    METADATA_PATH = os.path.join('.', 'tfx_metadata', PIPELINE_NAME, 'metadata.db')
    ENABLE_CACHE = True
    
    def create_pipeline(
      pipeline_name: Text,
      pipeline_root:Text,
      enable_cache: bool,
      metadata_connection_config: Optional[
        metadata_store_pb2.ConnectionConfig] = None,
      beam_pipeline_args: Optional[List[Text]] = None
    ):
      components = []
    
      return tfx.dsl.Pipeline(
            pipeline_name=pipeline_name,
            pipeline_root=pipeline_root,
            components=components,
            enable_cache=enable_cache,
            metadata_connection_config=metadata_connection_config,
            beam_pipeline_args=beam_pipeline_args, 
        )
    
    def run_pipeline():
      my_pipeline = create_pipeline(
          pipeline_name=PIPELINE_NAME,
          pipeline_root=PIPELINE_ROOT,
          enable_cache=ENABLE_CACHE,
          metadata_connection_config=tfx.orchestration.metadata.sqlite_metadata_connection_config(METADATA_PATH)
          )
    
      tfx.orchestration.LocalDagRunner().run(my_pipeline)
    
    if __name__ == '__main__':
      logging.set_verbosity(logging.INFO)
      run_pipeline()
    

    Nas próximas etapas, você define seu pipeline em create_pipeline e executar o seu pipeline localmente usando o corredor local.

    Construa iterativamente seu pipeline usando o seguinte processo.

    1. Personalize o pipeline ou um componente para se adequar ao seu caso de uso. Essas personalizações podem incluir alterações como:

      • Alterar os parâmetros do pipeline.
      • Adicionar componentes ao pipeline ou removê-los.
      • Substituindo um arquivo de entrada de dados.
      • Alterar a configuração de um componente no pipeline.
      • Alterar a função de personalização de um componente.
    2. Execute o componente localmente usando o executor local ou executando o script diretamente. Se o script falhar, depure a falha e tente executar o script novamente.

    3. Quando essa personalização estiver funcionando, vá para a próxima personalização.

    Comece do primeiro nó no fluxo de trabalho do pipeline; normalmente, o primeiro nó ingere dados no pipeline.

  3. Adicione o primeiro nó do fluxo de trabalho ao pipeline. Neste exemplo, o gasoduto usa o ExampleGen componente padrão para carregar um arquivo CSV de um diretório em ./data .

    from tfx.components import CsvExampleGen
    
    DATA_PATH = os.path.join('.', 'data')
    
    def create_pipeline(
      pipeline_name: Text,
      pipeline_root:Text,
      data_path: Text,
      enable_cache: bool,
      metadata_connection_config: Optional[
        metadata_store_pb2.ConnectionConfig] = None,
      beam_pipeline_args: Optional[List[Text]] = None
    ):
      components = []
    
      example_gen = tfx.components.CsvExampleGen(input_base=data_path)
      components.append(example_gen)
    
      return tfx.dsl.Pipeline(
            pipeline_name=pipeline_name,
            pipeline_root=pipeline_root,
            components=components,
            enable_cache=enable_cache,
            metadata_connection_config=metadata_connection_config,
            beam_pipeline_args=beam_pipeline_args, 
        )
    
    def run_pipeline():
      my_pipeline = create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        data_path=DATA_PATH,
        enable_cache=ENABLE_CACHE,
        metadata_connection_config=tfx.orchestration.metadata.sqlite_metadata_connection_config(METADATA_PATH)
        )
    
      tfx.orchestration.LocalDagRunner().run(my_pipeline)
    

    CsvExampleGen cria serializados exemplo registos utilizando os dados na CSV no caminho de dados especificado. Ao definir o CsvExampleGen do componente input_base parâmetro com a raiz de dados.

  4. Criar uma data diretório no mesmo diretório que my_pipeline.py . Adicionar um pequeno arquivo CSV com o data do diretório.

  5. Use o seguinte comando para executar o seu my_pipeline.py script.

    python my_pipeline.py
    

    O resultado deve ser algo como o seguinte:

    INFO:absl:Component CsvExampleGen depends on [].
    INFO:absl:Component CsvExampleGen is scheduled.
    INFO:absl:Component CsvExampleGen is running.
    INFO:absl:Running driver for CsvExampleGen
    INFO:absl:MetadataStore with DB connection initialized
    INFO:absl:Running executor for CsvExampleGen
    INFO:absl:Generating examples.
    INFO:absl:Using 1 process(es) for Local pipeline execution.
    INFO:absl:Processing input csv data ./data/* to TFExample.
    WARNING:root:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be.
    INFO:absl:Examples generated.
    INFO:absl:Running publisher for CsvExampleGen
    INFO:absl:MetadataStore with DB connection initialized
    INFO:absl:Component CsvExampleGen is finished.
    
  6. Continue a adicionar componentes iterativamente ao pipeline.