Treten Sie der SIG TFX-Addons-Community bei und helfen Sie mit, TFX noch besser zu machen! SIG TFX-Addons beitreten

Lokaler Aufbau einer TFX-Pipeline

TFX erleichtert die Orchestrierung Ihres Machine Learning (ML)-Workflows als Pipeline, um:

  • Automatisieren Sie Ihren ML-Prozess, sodass Sie Ihr Modell regelmäßig neu trainieren, bewerten und bereitstellen können.
  • Erstellen Sie ML-Pipelines, die eine eingehende Analyse der Modellleistung und die Validierung neu trainierter Modelle umfassen, um Leistung und Zuverlässigkeit sicherzustellen.
  • Überwachen Sie die Trainingsdaten auf Anomalien und eliminieren Sie Trainings-Serving-Schiefe
  • Erhöhen Sie die Experimentiergeschwindigkeit, indem Sie eine Pipeline mit verschiedenen Hyperparametersätzen ausführen.

Ein typischer Pipeline-Entwicklungsprozess beginnt auf einem lokalen Computer mit der Datenanalyse und dem Einrichten der Komponenten, bevor er in der Produktion bereitgestellt wird. In diesem Handbuch werden zwei Möglichkeiten zum lokalen Erstellen einer Pipeline beschrieben.

  • Passen Sie eine TFX-Pipeline-Vorlage an die Anforderungen Ihres ML-Workflows an. TFX-Pipeline-Vorlagen sind vorgefertigte Workflows, die Best Practices unter Verwendung der TFX-Standardkomponenten demonstrieren.
  • Erstellen Sie eine Pipeline mit TFX. In diesem Anwendungsfall definieren Sie eine Pipeline, ohne von einer Vorlage auszugehen.

Während Sie Ihre Pipeline entwickeln, können Sie sie mit LocalDagRunner . Nachdem die Pipelinekomponenten gut definiert und getestet wurden, verwenden Sie einen produktionstauglichen Orchestrator wie Kubeflow oder Airflow.

Bevor Sie beginnen

TFX ist ein Python-Paket, daher müssen Sie eine Python-Entwicklungsumgebung einrichten, z. B. eine virtuelle Umgebung oder einen Docker-Container. Dann:

pip install tfx

Wenn Sie neu bei TFX-Pipelines sind, erfahren Sie mehr über die Kernkonzepte für TFX-Pipelines, bevor Sie fortfahren.

Erstellen Sie eine Pipeline mit einer Vorlage

TFX-Pipeline-Vorlagen erleichtern den Einstieg in die Pipeline-Entwicklung, indem sie einen vorgefertigten Satz von Pipeline-Definitionen bereitstellen, die Sie für Ihren Anwendungsfall anpassen können.

In den folgenden Abschnitten wird beschrieben, wie Sie eine Kopie einer Vorlage erstellen und diese an Ihre Anforderungen anpassen.

Erstellen Sie eine Kopie der Pipeline-Vorlage

  1. Siehe Liste der verfügbaren TFX-Pipeline-Vorlagen:

    tfx template list
    
  2. Wählen Sie eine Vorlage aus der Liste

    tfx template copy --model=template --pipeline_name=pipeline-name \
    --destination_path=destination-path
    

    Ersetzen Sie Folgendes:

    • template : Der Name der Vorlage, die Sie kopieren möchten.
    • pipeline-name : Der Name der zu erstellenden Pipeline.
    • destination-path : Der Pfad, in den die Vorlage kopiert werden soll.

    Erfahren Sie mehr über den tfx template copy .

  3. Eine Kopie der Pipelinevorlage wurde unter dem von Ihnen angegebenen Pfad erstellt.

Entdecken Sie die Pipeline-Vorlage

Dieser Abschnitt bietet einen Überblick über das von einer Vorlage erstellte Gerüst.

  1. Untersuchen Sie die Verzeichnisse und Dateien, die in das Stammverzeichnis Ihrer Pipeline kopiert wurden

    • Ein Pipeline- Verzeichnis mit

      • pipeline.py - definiert die Pipeline und listet auf, welche Komponenten verwendet werden

      • configs.py - enthält Konfigurationsdetails, z. B. woher die Daten kommen oder welcher Orchestrator verwendet wird

    • Ein Datenverzeichnis

      • Diese enthält normalerweise eine data.csv Datei, die die Standardquelle für ExampleGen . Sie können die Datenquelle in configs.py .
    • Ein Modell Verzeichnis mit Vorverarbeitung Code und Modell - Implementierungen

    • Die Vorlage kopiert DAG-Runner für die lokale Umgebung und Kubeflow.

    • Einige Vorlagen enthalten auch Python Notebooks, damit Sie Ihre Daten und Artefakte mit Machine Learning-Metadaten untersuchen können.

  2. Führen Sie die folgenden Befehle in Ihrem Pipelineverzeichnis aus:

    tfx pipeline create --pipeline_path local_runner.py
    
    tfx run create --pipeline_name pipeline_name
    

    Der Befehl erstellt eine Pipelineausführung mit LocalDagRunner , wodurch die folgenden Verzeichnisse zu Ihrer Pipeline LocalDagRunner :

    • Ein tfx_metadata- Verzeichnis, das den lokal verwendeten ML-Metadatenspeicher enthält.
    • Ein tfx_pipeline_output- Verzeichnis, das die Dateiausgaben der Pipeline enthält.
  3. Öffnen Sie die Datei " pipeline/configs.py Ihrer Pipeline und überprüfen Sie den Inhalt. Dieses Skript definiert die Konfigurationsoptionen, die von der Pipeline und den Komponentenfunktionen verwendet werden. Hier geben Sie Dinge wie den Speicherort der Datenquelle oder die Anzahl der Trainingsschritte in einem Lauf an.

  4. Öffnen Sie die Datei " pipeline/pipeline.py Ihrer Pipeline und überprüfen Sie den Inhalt. Dieses Skript erstellt die TFX-Pipeline. Anfänglich enthält die Pipeline nur eine ExampleGen Komponente.

    • Folgen Sie den Anweisungen in den TODO- Kommentaren in pipeline.py , um der Pipeline weitere Schritte hinzuzufügen.
  5. Öffnen local_runner.py Datei local_runner.py und überprüfen Sie den Inhalt. Dieses Skript erstellt eine Pipeline laufen und legt die Parameter des Lauf, wie die data_path und preprocessing_fn .

  6. Sie haben das von der Vorlage erstellte Gerüst überprüft und mit LocalDagRunner eine Pipelineausführung LocalDagRunner . Passen Sie als Nächstes die Vorlage an Ihre Anforderungen an.

Passen Sie Ihre Pipeline an

Dieser Abschnitt bietet einen Überblick über die ersten Schritte beim Anpassen Ihrer Vorlage.

  1. Gestalten Sie Ihre Pipeline. Das von einer Vorlage bereitgestellte Gerüst unterstützt Sie bei der Implementierung einer Pipeline für tabellarische Daten unter Verwendung der TFX-Standardkomponenten. Wenn Sie einen vorhandenen ML-Workflow in eine Pipeline verschieben, müssen Sie möglicherweise Ihren Code überarbeiten, um die TFX-Standardkomponenten vollständig nutzen zu können. Möglicherweise müssen Sie auch benutzerdefinierte Komponenten erstellen, die Funktionen implementieren, die für Ihren Workflow einzigartig sind oder die noch nicht von TFX-Standardkomponenten unterstützt werden.

  2. Nachdem Sie Ihre Pipeline entworfen haben, passen Sie die Pipeline iterativ mit dem folgenden Prozess an. Beginnen Sie mit der Komponente, die Daten in Ihre Pipeline aufnimmt, bei der es sich normalerweise um die ExampleGen Komponente handelt.

    1. Passen Sie die Pipeline oder eine Komponente an Ihren Anwendungsfall an. Diese Anpassungen können Änderungen umfassen wie:

      • Ändern von Pipeline-Parametern.
      • Hinzufügen von Komponenten zur Pipeline oder Entfernen von ihnen.
      • Ersetzen der Dateneingabequelle. Bei dieser Datenquelle kann es sich entweder um eine Datei oder um Abfragen bei Diensten wie BigQuery handeln.

      • Ändern der Konfiguration einer Komponente in der Pipeline.

      • Ändern der Anpassungsfunktion einer Komponente.

    2. Führen Sie die Komponente lokal mit dem Skript local_runner.py oder einem anderen geeigneten DAG-Runner aus, wenn Sie einen anderen Orchestrator verwenden. Wenn das Skript fehlschlägt, debuggen Sie den Fehler und versuchen Sie erneut, das Skript auszuführen.

    3. Sobald diese Anpassung funktioniert, fahren Sie mit der nächsten Anpassung fort.

  3. Iterativ können Sie jeden Schritt im Vorlagenworkflow an Ihre Anforderungen anpassen.

Erstellen Sie eine benutzerdefinierte Pipeline

Verwenden Sie die folgenden Anweisungen, um mehr über das Erstellen einer benutzerdefinierten Pipeline ohne Verwendung einer Vorlage zu erfahren.

  1. Gestalten Sie Ihre Pipeline. Die TFX-Standardkomponenten bieten bewährte Funktionen, die Sie bei der Implementierung eines vollständigen ML-Workflows unterstützen. Wenn Sie einen vorhandenen ML-Workflow in eine Pipeline verschieben, müssen Sie möglicherweise Ihren Code überarbeiten, um die TFX-Standardkomponenten vollständig nutzen zu können. Möglicherweise müssen Sie auch benutzerdefinierte Komponenten erstellen, die Funktionen wie die Datenerweiterung implementieren.

  2. Erstellen Sie anhand des folgenden Beispiels eine Skriptdatei, um Ihre Pipeline zu definieren. In dieser Anleitung wird diese Datei als my_pipeline.py .

    import os
    from typing import Optional, Text, List
    from absl import logging
    from ml_metadata.proto import metadata_store_pb2
    import tfx.v1 as tfx
    
    PIPELINE_NAME = 'my_pipeline'
    PIPELINE_ROOT = os.path.join('.', 'my_pipeline_output')
    METADATA_PATH = os.path.join('.', 'tfx_metadata', PIPELINE_NAME, 'metadata.db')
    ENABLE_CACHE = True
    
    def create_pipeline(
      pipeline_name: Text,
      pipeline_root:Text,
      enable_cache: bool,
      metadata_connection_config: Optional[
        metadata_store_pb2.ConnectionConfig] = None
    ):
      components = []
    
      return tfx.dsl.Pipeline(
            pipeline_name=pipeline_name,
            pipeline_root=pipeline_root,
            components=components,
            enable_cache=enable_cache,
            metadata_connection_config=metadata_connection_config,
            beam_pipeline_args=beam_pipeline_args, 
        )
    
    def run_pipeline():
      my_pipeline = create_pipeline(
          pipeline_name=PIPELINE_NAME,
          pipeline_root=PIPELINE_ROOT,
          enable_cache=ENABLE_CACHE,
          metadata_connection_config=tfx.orchestration.metadata.sqlite_metadata_connection_config(METADATA_PATH)
          )
    
      tfx.orchestration.LocalDagRunner().run(my_pipeline)
    
    if __name__ == '__main__':
      logging.set_verbosity(logging.INFO)
      run_pipeline()
    

    In den nächsten Schritten definieren Sie Ihre Pipeline in create_pipeline und führen Ihre Pipeline lokal mit dem lokalen Runner aus.

    Erstellen Sie Ihre Pipeline iterativ mit dem folgenden Prozess.

    1. Passen Sie die Pipeline oder eine Komponente an Ihren Anwendungsfall an. Diese Anpassungen können Änderungen umfassen wie:

      • Ändern von Pipeline-Parametern.
      • Komponenten zur Pipeline hinzufügen oder entfernen.
      • Ersetzen einer Dateneingabedatei.
      • Ändern der Konfiguration einer Komponente in der Pipeline.
      • Ändern der Anpassungsfunktion einer Komponente.
    2. Führen Sie die Komponente lokal mit dem lokalen Runner oder durch direktes Ausführen des Skripts aus. Wenn das Skript fehlschlägt, debuggen Sie den Fehler und versuchen Sie erneut, das Skript auszuführen.

    3. Sobald diese Anpassung funktioniert, fahren Sie mit der nächsten Anpassung fort.

    Beginnen Sie mit dem ersten Knoten im Workflow Ihrer Pipeline. Normalerweise nimmt der erste Knoten Daten in Ihre Pipeline auf.

  3. Fügen Sie Ihrer Pipeline den ersten Knoten in Ihrem Workflow hinzu. In diesem Beispiel verwendet die Pipeline die ExampleGen Standardkomponente, um eine CSV- ./data aus einem Verzeichnis unter ./data zu ./data .

    from tfx.components import CsvExampleGen
    
    DATA_PATH = os.path.join('.', 'data')
    
    def create_pipeline(
      pipeline_name: Text,
      pipeline_root:Text,
      data_path: Text,
      enable_cache: bool,
      metadata_connection_config: Optional[
        metadata_store_pb2.ConnectionConfig] = None,
      beam_pipeline_args: Optional[List[Text]] = None
    ):
      components = []
    
      example_gen = tfx.components.CsvExampleGen(input_base=data_path)
      components.append(example_gen)
    
      return tfx.dsl.Pipeline(
            pipeline_name=pipeline_name,
            pipeline_root=pipeline_root,
            components=components,
            enable_cache=enable_cache,
            metadata_connection_config=metadata_connection_config,
            beam_pipeline_args=beam_pipeline_args, 
        )
    
      def run_pipeline():
        my_pipeline = create_pipeline(
          pipeline_name=PIPELINE_NAME,
          pipeline_root=PIPELINE_ROOT,
          data_path=DATA_PATH,
          enable_cache=ENABLE_CACHE,
          metadata_connection_config=metadata.sqlite_metadata_connection_config(METADATA_PATH)
          )
    
      LocalDagRunner().run(my_pipeline)
    

    CsvExampleGen erstellt serialisierte Beispieldatensätze unter Verwendung der Daten in der CSV- CsvExampleGen im angegebenen Datenpfad. Durch die Einstellung der CsvExampleGen der Komponente input_base Parameter mit der Datenwurzel.

  4. Erstellen Sie ein data im selben Verzeichnis wie my_pipeline.py . Fügen Sie dem data eine kleine CSV-Datei hinzu.

  5. Verwenden Sie den folgenden Befehl, um Ihr my_pipeline.py Skript my_pipeline.py .

    python my_pipeline.py
    

    Das Ergebnis sollte in etwa wie folgt aussehen:

    INFO:absl:Component CsvExampleGen depends on [].
    INFO:absl:Component CsvExampleGen is scheduled.
    INFO:absl:Component CsvExampleGen is running.
    INFO:absl:Running driver for CsvExampleGen
    INFO:absl:MetadataStore with DB connection initialized
    INFO:absl:Running executor for CsvExampleGen
    INFO:absl:Generating examples.
    INFO:absl:Using 1 process(es) for Local pipeline execution.
    INFO:absl:Processing input csv data ./data/* to TFExample.
    WARNING:root:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be.
    INFO:absl:Examples generated.
    INFO:absl:Running publisher for CsvExampleGen
    INFO:absl:MetadataStore with DB connection initialized
    INFO:absl:Component CsvExampleGen is finished.
    
  6. Fügen Sie Ihrer Pipeline weiterhin iterativ Komponenten hinzu.