Construire un pipeline TFX localement

TFX facilite l'orchestration de votre workflow d'apprentissage automatique (ML) sous forme de pipeline, afin de :

  • Automatisez votre processus ML, ce qui vous permet de recycler, d'évaluer et de déployer régulièrement votre modèle.
  • Créez des pipelines ML qui incluent une analyse approfondie des performances des modèles et la validation des modèles nouvellement formés pour garantir les performances et la fiabilité.
  • Surveillez les données de formation pour déceler les anomalies et éliminez les biais de diffusion de formation
  • Augmentez la vitesse de l’expérimentation en exécutant un pipeline avec différents ensembles d’hyperparamètres.

Un processus typique de développement de pipeline commence sur une machine locale, avec l'analyse des données et la configuration des composants, avant d'être déployé en production. Ce guide décrit deux manières de créer un pipeline localement.

  • Personnalisez un modèle de pipeline TFX pour l'adapter aux besoins de votre flux de travail ML. Les modèles de pipeline TFX sont des flux de travail prédéfinis qui démontrent les meilleures pratiques utilisant les composants standard TFX.
  • Créez un pipeline à l'aide de TFX. Dans ce cas d'utilisation, vous définissez un pipeline sans partir d'un modèle.

Lorsque vous développez votre pipeline, vous pouvez l'exécuter avec LocalDagRunner . Ensuite, une fois les composants du pipeline bien définis et testés, vous utiliserez un orchestrateur de niveau production tel que Kubeflow ou Airflow.

Avant que tu commences

TFX est un package Python, vous devrez donc configurer un environnement de développement Python, tel qu'un environnement virtuel ou un conteneur Docker. Alors:

pip install tfx

Si vous débutez avec les pipelines TFX, découvrez-en davantage sur les concepts de base des pipelines TFX avant de continuer.

Créer un pipeline à l'aide d'un modèle

Les modèles de pipeline TFX facilitent le démarrage du développement de pipelines en fournissant un ensemble prédéfini de définitions de pipeline que vous pouvez personnaliser pour votre cas d'utilisation.

Les sections suivantes décrivent comment créer une copie d'un modèle et la personnaliser pour répondre à vos besoins.

Créer une copie du modèle de pipeline

  1. Voir la liste des modèles de pipeline TFX disponibles :

    tfx template list
    
  2. Sélectionnez un modèle dans la liste

    tfx template copy --model=template --pipeline_name=pipeline-name \
    --destination_path=destination-path
    

    Remplacez les éléments suivants :

    • template : Le nom du modèle que vous souhaitez copier.
    • pipeline-name : Le nom du pipeline à créer.
    • destination-path : Le chemin dans lequel copier le modèle.

    En savoir plus sur la commande tfx template copy .

  3. Une copie du modèle de pipeline a été créée au chemin que vous avez spécifié.

Explorez le modèle de pipeline

Cette section fournit une présentation de l'échafaudage créé par un modèle.

  1. Explorez les répertoires et fichiers copiés dans le répertoire racine de votre pipeline

    • Un répertoire de pipeline avec
      • pipeline.py - définit le pipeline et répertorie les composants utilisés
      • configs.py - contient les détails de configuration tels que la provenance des données ou l'orchestrateur utilisé
    • Un répertoire de données
      • Celui-ci contient généralement un fichier data.csv , qui est la source par défaut pour ExampleGen . Vous pouvez modifier la source de données dans configs.py .
    • Un répertoire de modèles avec du code de prétraitement et des implémentations de modèles

    • Le modèle copie les exécuteurs DAG pour l'environnement local et Kubeflow.

    • Certains modèles incluent également des blocs-notes Python afin que vous puissiez explorer vos données et artefacts avec les métadonnées de Machine Learning.

  2. Exécutez les commandes suivantes dans votre répertoire de pipeline :

    tfx pipeline create --pipeline_path local_runner.py
    
    tfx run create --pipeline_name pipeline_name
    

    La commande crée une exécution de pipeline à l'aide LocalDagRunner , qui ajoute les répertoires suivants à votre pipeline :

    • Un répertoire tfx_metadata qui contient le magasin ML Metadata utilisé localement.
    • Un répertoire tfx_pipeline_output qui contient les sorties de fichiers du pipeline.
  3. Ouvrez le fichier pipeline/configs.py de votre pipeline et examinez le contenu. Ce script définit les options de configuration utilisées par le pipeline et les fonctions des composants. C'est ici que vous spécifierez des éléments tels que l'emplacement de la source de données ou le nombre d'étapes de formation dans une exécution.

  4. Ouvrez le fichier pipeline/pipeline.py de votre pipeline et examinez son contenu. Ce script crée le pipeline TFX. Initialement, le pipeline contient uniquement un composant ExampleGen .

    • Suivez les instructions dans les commentaires TODO dans pipeline.py pour ajouter d'autres étapes au pipeline.
  5. Ouvrez le fichier local_runner.py et examinez le contenu. Ce script crée une exécution de pipeline et spécifie les paramètres de l'exécution, tels que data_path et preprocessing_fn .

  6. Vous avez examiné l'échafaudage créé par le modèle et créé une exécution de pipeline à l'aide de LocalDagRunner . Ensuite, personnalisez le modèle en fonction de vos besoins.

Personnalisez votre pipeline

Cette section fournit un aperçu de la façon de commencer à personnaliser votre modèle.

  1. Concevez votre pipeline. L'échafaudage fourni par un modèle vous aide à implémenter un pipeline pour les données tabulaires à l'aide des composants standard TFX. Si vous déplacez un workflow ML existant vers un pipeline, vous devrez peut-être réviser votre code pour utiliser pleinement les composants standard TFX . Vous devrez peut-être également créer des composants personnalisés qui implémentent des fonctionnalités uniques à votre flux de travail ou qui ne sont pas encore prises en charge par les composants standard TFX.

  2. Une fois que vous avez conçu votre pipeline, personnalisez-le de manière itérative en suivant le processus suivant. Commencez par le composant qui ingère les données dans votre pipeline, qui est généralement le composant ExampleGen .

    1. Personnalisez le pipeline ou un composant en fonction de votre cas d'utilisation. Ces personnalisations peuvent inclure des modifications telles que :

      • Modification des paramètres du pipeline.
      • Ajouter des composants au pipeline ou les supprimer.
      • Remplacement de la source d'entrée de données. Cette source de données peut être un fichier ou des requêtes vers des services tels que BigQuery.
      • Modification de la configuration d'un composant dans le pipeline.
      • Modification de la fonction de personnalisation d'un composant.
    2. Exécutez le composant localement à l'aide du script local_runner.py ou d'un autre exécuteur DAG approprié si vous utilisez un autre orchestrateur. Si le script échoue, déboguez l'échec et réessayez d'exécuter le script.

    3. Une fois cette personnalisation fonctionnelle, passez à la personnalisation suivante.

  3. En travaillant de manière itérative, vous pouvez personnaliser chaque étape du flux de travail du modèle pour répondre à vos besoins.

Créer un pipeline personnalisé

Utilisez les instructions suivantes pour en savoir plus sur la création d'un pipeline personnalisé sans utiliser de modèle.

  1. Concevez votre pipeline. Les composants standard TFX offrent des fonctionnalités éprouvées pour vous aider à mettre en œuvre un flux de travail ML complet. Si vous déplacez un workflow ML existant vers un pipeline, vous devrez peut-être réviser votre code pour utiliser pleinement les composants standard TFX. Vous devrez peut-être également créer des composants personnalisés qui implémentent des fonctionnalités telles que l'augmentation des données.

  2. Créez un fichier de script pour définir votre pipeline à l'aide de l'exemple suivant. Ce guide fait référence à ce fichier sous le nom de my_pipeline.py .

    import os
    from typing import Optional, Text, List
    from absl import logging
    from ml_metadata.proto import metadata_store_pb2
    import tfx.v1 as tfx
    
    PIPELINE_NAME = 'my_pipeline'
    PIPELINE_ROOT = os.path.join('.', 'my_pipeline_output')
    METADATA_PATH = os.path.join('.', 'tfx_metadata', PIPELINE_NAME, 'metadata.db')
    ENABLE_CACHE = True
    
    def create_pipeline(
      pipeline_name: Text,
      pipeline_root:Text,
      enable_cache: bool,
      metadata_connection_config: Optional[
        metadata_store_pb2.ConnectionConfig] = None,
      beam_pipeline_args: Optional[List[Text]] = None
    ):
      components = []
    
      return tfx.dsl.Pipeline(
            pipeline_name=pipeline_name,
            pipeline_root=pipeline_root,
            components=components,
            enable_cache=enable_cache,
            metadata_connection_config=metadata_connection_config,
            beam_pipeline_args=beam_pipeline_args, 
        )
    
    def run_pipeline():
      my_pipeline = create_pipeline(
          pipeline_name=PIPELINE_NAME,
          pipeline_root=PIPELINE_ROOT,
          enable_cache=ENABLE_CACHE,
          metadata_connection_config=tfx.orchestration.metadata.sqlite_metadata_connection_config(METADATA_PATH)
          )
    
      tfx.orchestration.LocalDagRunner().run(my_pipeline)
    
    if __name__ == '__main__':
      logging.set_verbosity(logging.INFO)
      run_pipeline()
    

    Dans les étapes à venir, vous définirez votre pipeline dans create_pipeline et exécuterez votre pipeline localement à l'aide du programme d'exécution local.

    Créez votre pipeline de manière itérative en suivant le processus suivant.

    1. Personnalisez le pipeline ou un composant en fonction de votre cas d'utilisation. Ces personnalisations peuvent inclure des modifications telles que :

      • Modification des paramètres du pipeline.
      • Ajouter des composants au pipeline ou les supprimer.
      • Remplacement d'un fichier d'entrée de données.
      • Modification de la configuration d'un composant dans le pipeline.
      • Modification de la fonction de personnalisation d'un composant.
    2. Exécutez le composant localement à l'aide du programme d'exécution local ou en exécutant directement le script. Si le script échoue, déboguez l'échec et réessayez d'exécuter le script.

    3. Une fois cette personnalisation fonctionnelle, passez à la personnalisation suivante.

    Commencez par le premier nœud du flux de travail de votre pipeline, généralement le premier nœud ingère les données dans votre pipeline.

  3. Ajoutez le premier nœud de votre workflow à votre pipeline. Dans cet exemple, le pipeline utilise le composant standard ExampleGen pour charger un CSV à partir d'un répertoire dans ./data .

    from tfx.components import CsvExampleGen
    
    DATA_PATH = os.path.join('.', 'data')
    
    def create_pipeline(
      pipeline_name: Text,
      pipeline_root:Text,
      data_path: Text,
      enable_cache: bool,
      metadata_connection_config: Optional[
        metadata_store_pb2.ConnectionConfig] = None,
      beam_pipeline_args: Optional[List[Text]] = None
    ):
      components = []
    
      example_gen = tfx.components.CsvExampleGen(input_base=data_path)
      components.append(example_gen)
    
      return tfx.dsl.Pipeline(
            pipeline_name=pipeline_name,
            pipeline_root=pipeline_root,
            components=components,
            enable_cache=enable_cache,
            metadata_connection_config=metadata_connection_config,
            beam_pipeline_args=beam_pipeline_args, 
        )
    
    def run_pipeline():
      my_pipeline = create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        data_path=DATA_PATH,
        enable_cache=ENABLE_CACHE,
        metadata_connection_config=tfx.orchestration.metadata.sqlite_metadata_connection_config(METADATA_PATH)
        )
    
      tfx.orchestration.LocalDagRunner().run(my_pipeline)
    

    CsvExampleGen crée des exemples d'enregistrements sérialisés en utilisant les données du CSV au chemin de données spécifié. En définissant le paramètre input_base du composant CsvExampleGen avec la racine des données.

  4. Créez un répertoire data dans le même répertoire que my_pipeline.py . Ajoutez un petit fichier CSV au répertoire data .

  5. Utilisez la commande suivante pour exécuter votre script my_pipeline.py .

    python my_pipeline.py
    

    Le résultat devrait ressembler à ce qui suit :

    INFO:absl:Component CsvExampleGen depends on [].
    INFO:absl:Component CsvExampleGen is scheduled.
    INFO:absl:Component CsvExampleGen is running.
    INFO:absl:Running driver for CsvExampleGen
    INFO:absl:MetadataStore with DB connection initialized
    INFO:absl:Running executor for CsvExampleGen
    INFO:absl:Generating examples.
    INFO:absl:Using 1 process(es) for Local pipeline execution.
    INFO:absl:Processing input csv data ./data/* to TFExample.
    WARNING:root:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be.
    INFO:absl:Examples generated.
    INFO:absl:Running publisher for CsvExampleGen
    INFO:absl:MetadataStore with DB connection initialized
    INFO:absl:Component CsvExampleGen is finished.
    
  6. Continuez à ajouter des composants de manière itérative à votre pipeline.