Samouczek dotyczący komponentu funkcyjnego TFX Python

Ten notatnik zawiera przykłady tworzenia i uruchamiania komponentów funkcji Pythona w ramach TFX InteractiveContext oraz w lokalnie zorganizowanym potoku TFX.

Aby uzyskać więcej kontekstu i informacji, zobacz komponenty funkcji Custom Pythona strony na stronie dokumentacji TFX.

Ustawiać

Najpierw zainstalujemy TFX i zaimportujemy niezbędne moduły. TFX wymaga Pythona 3.

Sprawdź systemową wersję Pythona

import sys
sys.version
'3.7.5 (default, Feb 23 2021, 13:22:40) \n[GCC 8.4.0]'

Ulepsz Pip

Aby uniknąć aktualizacji Pip w systemie uruchomionym lokalnie, upewnij się, że działamy w Colab. Systemy lokalne można oczywiście aktualizować oddzielnie.

try:
  import colab
  !pip install --upgrade pip
except:
  pass

Zainstaluj TFX

pip install -U tfx

Czy uruchomiłeś ponownie środowisko wykonawcze?

Jeśli używasz Google Colab, przy pierwszym uruchomieniu powyższej komórki musisz ponownie uruchomić środowisko wykonawcze (Runtime > Restart runtime ...). Wynika to ze sposobu, w jaki Colab ładuje paczki.

Importuj paczki

Importujemy TFX i sprawdzamy jego wersję.

# Check version
from tfx import v1 as tfx
tfx.__version__
'1.4.0'

Niestandardowe komponenty funkcyjne Pythona

W tej sekcji stworzymy komponenty z funkcji Pythona. Nie będziemy robić żadnego prawdziwego problemu z ML — te proste funkcje służą tylko do zilustrowania procesu tworzenia komponentów funkcyjnych w Pythonie.

Zobacz funkcji Python guide komponentowego w oparciu o więcej dokumentacji.

Twórz niestandardowe komponenty Pythona

Zaczynamy od napisania funkcji, która generuje pewne fikcyjne dane. Jest to zapisywane w jego własnym pliku modułu Pythona.

%%writefile my_generator.py

import os
import tensorflow as tf  # Used for writing files.

from tfx import v1 as tfx

# Non-public APIs, just for showcase.
from tfx.types.experimental.simple_artifacts import Dataset

@tfx.dsl.components.component
def MyGenerator(data: tfx.dsl.components.OutputArtifact[Dataset]):
  """Create a file with dummy data in the output artifact."""
  with tf.io.gfile.GFile(os.path.join(data.uri, 'data_file.txt'), 'w') as f:
    f.write('Dummy data')

  # Set metadata and ensure that it gets passed to downstream components.
  data.set_string_custom_property('my_custom_field', 'my_custom_value')
Writing my_generator.py

Następnie piszemy drugi komponent, który wykorzystuje wytworzone dane fikcyjne. Po prostu obliczymy hash danych i zwrócimy je.

%%writefile my_consumer.py

import hashlib
import os
import tensorflow as tf

from tfx import v1 as tfx

# Non-public APIs, just for showcase.
from tfx.types.experimental.simple_artifacts import Dataset
from tfx.types.standard_artifacts import String

@tfx.dsl.components.component
def MyConsumer(data: tfx.dsl.components.InputArtifact[Dataset],
               hash: tfx.dsl.components.OutputArtifact[String],
               algorithm: tfx.dsl.components.Parameter[str] = 'sha256'):
  """Reads the contents of data and calculate."""
  with tf.io.gfile.GFile(
      os.path.join(data.uri, 'data_file.txt'), 'r') as f:
    contents = f.read()
  h = hashlib.new(algorithm)
  h.update(tf.compat.as_bytes(contents))
  hash.value = h.hexdigest()

  # Read a custom property from the input artifact and set to the output.
  custom_value = data.get_string_custom_property('my_custom_field')
  hash.set_string_custom_property('input_custom_field', custom_value)
Writing my_consumer.py

Uruchom w notebooku z InteractiveContext

Teraz zademonstrujemy użycie naszych nowych komponentów w TFX InteractiveContext.

Aby uzyskać więcej informacji na temat tego, co można zrobić z notebookiem TFX InteractiveContext, zobacz w notebook TFX Keras Komponent Tutorial .

from my_generator import MyGenerator
from my_consumer import MyConsumer

Skonstruuj InteractiveContext

# Here, we create an InteractiveContext using default parameters. This will
# use a temporary directory with an ephemeral ML Metadata database instance.
# To use your own pipeline root or database, the optional properties
# `pipeline_root` and `metadata_connection_config` may be passed to
# InteractiveContext. Calls to InteractiveContext are no-ops outside of the
# notebook.
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext
context = InteractiveContext()
WARNING:absl:InteractiveContext pipeline_root argument not provided: using temporary directory /tmp/tfx-interactive-2021-12-05T10_37_04.715534-3q0k1y0m as root for pipeline outputs.
WARNING:absl:InteractiveContext metadata_connection_config not provided: using SQLite ML Metadata database at /tmp/tfx-interactive-2021-12-05T10_37_04.715534-3q0k1y0m/metadata.sqlite.

Uruchomić komponent interaktywnie z context.run()

Następnie uruchamiamy nasze komponenty interaktywnie w notebooku z context.run() . Nasz komponent konsumencki wykorzystuje wyjścia komponentu generatora.

generator = MyGenerator()
context.run(generator)
WARNING: Logging before InitGoogleLogging() is written to STDERR
I1205 10:37:04.765872 28682 rdbms_metadata_access_object.cc:686] No property is defined for the Type
consumer = MyConsumer(
    data=generator.outputs['data'],
    algorithm='md5')
context.run(consumer)
I1205 10:37:04.808555 28682 rdbms_metadata_access_object.cc:686] No property is defined for the Type

Po wykonaniu możemy sprawdzić zawartość artefaktu wyjściowego „haszującego” komponentu konsumenckiego na dysku.

tail -v {consumer.outputs['hash'].get()[0].uri}
==> /tmp/tfx-interactive-2021-12-05T10_37_04.715534-3q0k1y0m/MyConsumer/hash/2/value <==
0015fe7975d1a2794b59aa12635703f1

To wszystko, a teraz napisałeś i wykonałeś własne niestandardowe komponenty!

Napisz definicję potoku

Następnie utworzymy potok przy użyciu tych samych komponentów. Podczas korzystania z InteractiveContext w notebooku działa dobrze dla eksperymentów, określając rurociągu pozwala wdrożyć rurociągu na lokalnym lub zdalnym biegaczy do użytku produkcyjnego.

Tutaj zademonstrujemy użycie LocalDagRunner działającego lokalnie na twoim komputerze. Do wykonywania produkcji bardziej odpowiednie mogą być moduły uruchamiające Airflow lub Kubeflow.

Zbuduj rurociąg

import os
import tempfile
from tfx import v1 as tfx

# Select a persistent TFX root directory to store your output artifacts.
# For demonstration purposes only, we use a temporary directory.
PIPELINE_ROOT = tempfile.mkdtemp()
# Select a pipeline name so that multiple runs of the same logical pipeline
# can be grouped.
PIPELINE_NAME = "function-based-pipeline"
# We use a ML Metadata configuration that uses a local SQLite database in
# the pipeline root directory. Other backends for ML Metadata are available
# for production usage.
METADATA_CONNECTION_CONFIG = tfx.orchestration.metadata.sqlite_metadata_connection_config(
    os.path.join(PIPELINE_ROOT, 'metadata.sqlite'))

def function_based_pipeline():
  # Here, we construct our generator and consumer components in the same way.
  generator = MyGenerator()
  consumer = MyConsumer(
      data=generator.outputs['data'],
      algorithm='md5')

  return tfx.dsl.Pipeline(
      pipeline_name=PIPELINE_NAME,
      pipeline_root=PIPELINE_ROOT,
      components=[generator, consumer],
      metadata_connection_config=METADATA_CONNECTION_CONFIG)

my_pipeline = function_based_pipeline()

Uruchomić rurociąg z LocalDagRunner

tfx.orchestration.LocalDagRunner().run(my_pipeline)
I1205 10:37:04.983860 28682 rdbms_metadata_access_object.cc:686] No property is defined for the Type
I1205 10:37:04.990442 28682 rdbms_metadata_access_object.cc:686] No property is defined for the Type
I1205 10:37:04.996665 28682 rdbms_metadata_access_object.cc:686] No property is defined for the Type
I1205 10:37:05.003470 28682 rdbms_metadata_access_object.cc:686] No property is defined for the Type
I1205 10:37:05.013659 28682 rdbms_metadata_access_object.cc:686] No property is defined for the Type
I1205 10:37:05.031374 28682 rdbms_metadata_access_object.cc:686] No property is defined for the Type
I1205 10:37:05.048280 28682 rdbms_metadata_access_object.cc:686] No property is defined for the Type
I1205 10:37:05.067972 28682 rdbms_metadata_access_object.cc:686] No property is defined for the Type

Możemy sprawdzić artefakty wyjściowe generowane przez to wykonanie potoku.

find {PIPELINE_ROOT}
/tmp/tmpydmun02b
/tmp/tmpydmun02b/metadata.sqlite
/tmp/tmpydmun02b/MyConsumer
/tmp/tmpydmun02b/MyConsumer/.system
/tmp/tmpydmun02b/MyConsumer/.system/executor_execution
/tmp/tmpydmun02b/MyConsumer/.system/executor_execution/2
/tmp/tmpydmun02b/MyConsumer/hash
/tmp/tmpydmun02b/MyConsumer/hash/2
/tmp/tmpydmun02b/MyConsumer/hash/2/value
/tmp/tmpydmun02b/MyGenerator
/tmp/tmpydmun02b/MyGenerator/data
/tmp/tmpydmun02b/MyGenerator/data/1
/tmp/tmpydmun02b/MyGenerator/data/1/data_file.txt
/tmp/tmpydmun02b/MyGenerator/.system
/tmp/tmpydmun02b/MyGenerator/.system/executor_execution
/tmp/tmpydmun02b/MyGenerator/.system/executor_execution/1

Napisałeś teraz własne niestandardowe komponenty i zorganizowałeś ich wykonanie w LocalDagRunner! Na kolejnych etapach, sprawdź dodatkowe samouczki i przewodniki na stronie TFX .