Treten Sie der SIG TFX-Addons-Community bei und helfen Sie, TFX noch besser zu machen!

Tutorial zur TFX Python-Funktionskomponente

Dieses Notizbuch enthält Beispiele zum Erstellen und Ausführen von Python-Funktionskomponenten im TFX InteractiveContext und in einer lokal orchestrierten TFX-Pipeline.

Weitere Informationen zu Kontext und Informationen finden Sie auf der Seite Benutzerdefinierte Python-Funktionskomponenten auf der TFX-Dokumentationssite.

Einrichten

Wir werden zuerst TFX installieren und die erforderlichen Module importieren. TFX benötigt Python 3.

Überprüfen Sie die Python-Version des Systems

import sys
sys.version
'3.7.5 (default, Feb 23 2021, 13:22:40) \n[GCC 8.4.0]'

Upgrade Pip

Um zu vermeiden, dass Pip in einem System aktualisiert wird, wenn es lokal ausgeführt wird, stellen Sie sicher, dass wir in Colab ausgeführt werden. Lokale Systeme können natürlich separat aktualisiert werden.

try:
  import colab
  !pip install -q --upgrade pip
except:
  pass

Installieren Sie TFX

pip install -q -U tfx

Haben Sie die Laufzeit neu gestartet?

Wenn Sie Google Colab verwenden und die obige Zelle zum ersten Mal ausführen, müssen Sie die Laufzeit neu starten (Laufzeit> Laufzeit neu starten ...). Dies liegt an der Art und Weise, wie Colab Pakete lädt.

Pakete importieren

Wir importieren TFX und überprüfen seine Version.

# Check version
from tfx import v1 as tfx
tfx.__version__
WARNING:absl:RuntimeParameter is only supported on Cloud-based DAG runner currently.
'0.30.0'

Benutzerdefinierte Python-Funktionskomponenten

In diesem Abschnitt erstellen wir Komponenten aus Python-Funktionen. Wir werden kein echtes ML-Problem lösen - diese einfachen Funktionen werden nur verwendet, um den Entwicklungsprozess der Python-Funktionskomponenten zu veranschaulichen.

Weitere Dokumentation finden Sie im Python-Handbuch für funktionsbasierte Komponenten .

Erstellen Sie benutzerdefinierte Python-Komponenten

Wir beginnen mit dem Schreiben einer Funktion, die einige Dummy-Daten generiert. Dies wird in eine eigene Python-Moduldatei geschrieben.

%%writefile my_generator.py

import os
import tensorflow as tf  # Used for writing files.

from tfx import v1 as tfx

# Non-public APIs, just for showcase.
from tfx.types.experimental.simple_artifacts import Dataset

@tfx.dsl.components.component
def MyGenerator(data: tfx.dsl.components.OutputArtifact[Dataset]):
  """Create a file with dummy data in the output artifact."""
  with tf.io.gfile.GFile(os.path.join(data.uri, 'data_file.txt'), 'w') as f:
    f.write('Dummy data')

  # Set metadata and ensure that it gets passed to downstream components.
  data.set_string_custom_property('my_custom_field', 'my_custom_value')
Writing my_generator.py

Als nächstes schreiben wir eine zweite Komponente, die die erzeugten Dummy-Daten verwendet. Wir werden nur den Hash der Daten berechnen und zurückgeben.

%%writefile my_consumer.py

import hashlib
import os
import tensorflow as tf

from tfx import v1 as tfx

# Non-public APIs, just for showcase.
from tfx.types.experimental.simple_artifacts import Dataset
from tfx.types.standard_artifacts import String

@tfx.dsl.components.component
def MyConsumer(data: tfx.dsl.components.InputArtifact[Dataset],
               hash: tfx.dsl.components.OutputArtifact[String],
               algorithm: tfx.dsl.components.Parameter[str] = 'sha256'):
  """Reads the contents of data and calculate."""
  with tf.io.gfile.GFile(
      os.path.join(data.uri, 'data_file.txt'), 'r') as f:
    contents = f.read()
  h = hashlib.new(algorithm)
  h.update(tf.compat.as_bytes(contents))
  hash.value = h.hexdigest()

  # Read a custom property from the input artifact and set to the output.
  custom_value = data.get_string_custom_property('my_custom_field')
  hash.set_string_custom_property('input_custom_field', custom_value)
Writing my_consumer.py

Führen Sie das In-Notebook mit dem InteractiveContext aus

Jetzt werden wir die Verwendung unserer neuen Komponenten im TFX InteractiveContext demonstrieren.

Weitere Informationen dazu, was Sie mit dem TFX-Notebook InteractiveContext tun können, finden Sie im TFX-Keras-Komponenten-Tutorial im Notebook.

from my_generator import MyGenerator
from my_consumer import MyConsumer

Konstruieren Sie den InteractiveContext

# Here, we create an InteractiveContext using default parameters. This will
# use a temporary directory with an ephemeral ML Metadata database instance.
# To use your own pipeline root or database, the optional properties
# `pipeline_root` and `metadata_connection_config` may be passed to
# InteractiveContext. Calls to InteractiveContext are no-ops outside of the
# notebook.
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext
context = InteractiveContext()
WARNING:absl:InteractiveContext pipeline_root argument not provided: using temporary directory /tmp/tfx-interactive-2021-05-19T21_06_14.642885-8dzuv3xn as root for pipeline outputs.
WARNING:absl:InteractiveContext metadata_connection_config not provided: using SQLite ML Metadata database at /tmp/tfx-interactive-2021-05-19T21_06_14.642885-8dzuv3xn/metadata.sqlite.

Führen Sie Ihre Komponente interaktiv mit context.run()

Als Nächstes führen wir unsere Komponenten interaktiv im Notizbuch mit context.run() . Unsere Verbraucherkomponente verwendet die Ausgänge der Generatorkomponente.

generator = MyGenerator()
context.run(generator)
consumer = MyConsumer(
    data=generator.outputs['data'],
    algorithm='md5')
context.run(consumer)

Nach der Ausführung können wir den Inhalt des "Hash" -Ausgabeartefakts der Verbraucherkomponente auf der Festplatte überprüfen.

tail -v {consumer.outputs['hash'].get()[0].uri}
==> /tmp/tfx-interactive-2021-05-19T21_06_14.642885-8dzuv3xn/MyConsumer/hash/2/value <==
0015fe7975d1a2794b59aa12635703f1

Das war's und Sie haben jetzt Ihre eigenen benutzerdefinierten Komponenten geschrieben und ausgeführt!

Schreiben Sie eine Pipeline-Definition

Als nächstes werden wir eine Pipeline mit denselben Komponenten erstellen. Während die Verwendung des InteractiveContext in einem Notizbuch gut zum Experimentieren geeignet ist, können Sie durch Definieren einer Pipeline Ihre Pipeline auf lokalen oder Remote-Läufern für die Verwendung in der Produktion bereitstellen.

Hier zeigen wir die Verwendung des LocalDagRunner, der lokal auf Ihrem Computer ausgeführt wird. Für die Produktionsausführung sind möglicherweise die Airflow- oder Kubeflow-Läufer besser geeignet.

Baue eine Pipeline

import os
import tempfile
from tfx import v1 as tfx

# Select a persistent TFX root directory to store your output artifacts.
# For demonstration purposes only, we use a temporary directory.
PIPELINE_ROOT = tempfile.mkdtemp()
# Select a pipeline name so that multiple runs of the same logical pipeline
# can be grouped.
PIPELINE_NAME = "function-based-pipeline"
# We use a ML Metadata configuration that uses a local SQLite database in
# the pipeline root directory. Other backends for ML Metadata are available
# for production usage.
METADATA_CONNECTION_CONFIG = tfx.orchestration.metadata.sqlite_metadata_connection_config(
    os.path.join(PIPELINE_ROOT, 'metadata.sqlite'))

def function_based_pipeline():
  # Here, we construct our generator and consumer components in the same way.
  generator = MyGenerator()
  consumer = MyConsumer(
      data=generator.outputs['data'],
      algorithm='md5')

  return tfx.dsl.Pipeline(
      pipeline_name=PIPELINE_NAME,
      pipeline_root=PIPELINE_ROOT,
      components=[generator, consumer],
      metadata_connection_config=METADATA_CONNECTION_CONFIG)

my_pipeline = function_based_pipeline()

Führen Sie Ihre Pipeline mit dem LocalDagRunner

tfx.orchestration.LocalDagRunner().run(my_pipeline)
WARNING:apache_beam.options.pipeline_options:Discarding unparseable args: ['-f', '/tmp/tmpascn04df.json', '--HistoryManager.hist_file=:memory:']
WARNING:apache_beam.options.pipeline_options:Discarding unparseable args: ['-f', '/tmp/tmpascn04df.json', '--HistoryManager.hist_file=:memory:']

Wir können die durch diese Pipeline-Ausführung erzeugten Ausgabeartefakte untersuchen.

find {PIPELINE_ROOT}
/tmp/tmpdui18w8v
/tmp/tmpdui18w8v/metadata.sqlite
/tmp/tmpdui18w8v/MyConsumer
/tmp/tmpdui18w8v/MyConsumer/.system
/tmp/tmpdui18w8v/MyConsumer/.system/executor_execution
/tmp/tmpdui18w8v/MyConsumer/.system/executor_execution/2
/tmp/tmpdui18w8v/MyConsumer/hash
/tmp/tmpdui18w8v/MyConsumer/hash/2
/tmp/tmpdui18w8v/MyConsumer/hash/2/value
/tmp/tmpdui18w8v/MyGenerator
/tmp/tmpdui18w8v/MyGenerator/data
/tmp/tmpdui18w8v/MyGenerator/data/1
/tmp/tmpdui18w8v/MyGenerator/data/1/data_file.txt
/tmp/tmpdui18w8v/MyGenerator/.system
/tmp/tmpdui18w8v/MyGenerator/.system/executor_execution
/tmp/tmpdui18w8v/MyGenerator/.system/executor_execution/1

Sie haben jetzt Ihre eigenen benutzerdefinierten Komponenten geschrieben und deren Ausführung auf dem LocalDagRunner orchestriert! Weitere Tutorials und Anleitungen finden Sie auf der TFX-Website .