Tutoriel du composant de fonction Python TFX

Ce bloc-notes contient des exemples sur la façon de créer et d'exécuter des composants de fonction Python dans TFX InteractiveContext et dans un pipeline TFX orchestré localement.

Pour plus de contexte et d' information, voir les composants de la fonction Python personnalisée page sur le site de documentation TFX.

Installer

Nous allons d'abord installer TFX et importer les modules nécessaires. TFX nécessite Python 3.

Vérifiez la version Python du système

import sys
sys.version
'3.7.5 (default, Feb 23 2021, 13:22:40) \n[GCC 8.4.0]'

Pip de mise à niveau

Pour éviter de mettre à niveau Pip dans un système lors de l'exécution locale, assurez-vous que nous exécutons dans Colab. Les systèmes locaux peuvent bien sûr être mis à niveau séparément.

try:
  import colab
  !pip install --upgrade pip
except:
  pass

Installer TFX

pip install -U tfx

As-tu redémarré le runtime ?

Si vous utilisez Google Colab, la première fois que vous exécutez la cellule ci-dessus, vous devez redémarrer le runtime (Runtime > Redémarrer le runtime...). Cela est dû à la façon dont Colab charge les packages.

Importer des packages

Nous importons TFX et vérifions sa version.

# Check version
from tfx import v1 as tfx
tfx.__version__
'1.4.0'

Composants de fonction Python personnalisés

Dans cette section, nous allons créer des composants à partir de fonctions Python. Nous ne ferons pas de vrai problème de ML - ces fonctions simples sont juste utilisées pour illustrer le processus de développement de composants de fonction Python.

Voir fonction Python Guide composante pour plus de documentation.

Créer des composants personnalisés Python

Nous commençons par écrire une fonction qui génère des données factices. Ceci est écrit dans son propre fichier de module Python.

%%writefile my_generator.py

import os
import tensorflow as tf  # Used for writing files.

from tfx import v1 as tfx

# Non-public APIs, just for showcase.
from tfx.types.experimental.simple_artifacts import Dataset

@tfx.dsl.components.component
def MyGenerator(data: tfx.dsl.components.OutputArtifact[Dataset]):
  """Create a file with dummy data in the output artifact."""
  with tf.io.gfile.GFile(os.path.join(data.uri, 'data_file.txt'), 'w') as f:
    f.write('Dummy data')

  # Set metadata and ensure that it gets passed to downstream components.
  data.set_string_custom_property('my_custom_field', 'my_custom_value')
Writing my_generator.py

Ensuite, nous écrivons un deuxième composant qui utilise les données factices produites. Nous allons simplement calculer le hachage des données et le renvoyer.

%%writefile my_consumer.py

import hashlib
import os
import tensorflow as tf

from tfx import v1 as tfx

# Non-public APIs, just for showcase.
from tfx.types.experimental.simple_artifacts import Dataset
from tfx.types.standard_artifacts import String

@tfx.dsl.components.component
def MyConsumer(data: tfx.dsl.components.InputArtifact[Dataset],
               hash: tfx.dsl.components.OutputArtifact[String],
               algorithm: tfx.dsl.components.Parameter[str] = 'sha256'):
  """Reads the contents of data and calculate."""
  with tf.io.gfile.GFile(
      os.path.join(data.uri, 'data_file.txt'), 'r') as f:
    contents = f.read()
  h = hashlib.new(algorithm)
  h.update(tf.compat.as_bytes(contents))
  hash.value = h.hexdigest()

  # Read a custom property from the input artifact and set to the output.
  custom_value = data.get_string_custom_property('my_custom_field')
  hash.set_string_custom_property('input_custom_field', custom_value)
Writing my_consumer.py

Exécuter dans un ordinateur portable avec InteractiveContext

Maintenant, nous allons démontrer l'utilisation de nos nouveaux composants dans le TFX InteractiveContext.

Pour plus d' informations sur ce que vous pouvez faire avec le bloc - notes TFX InteractiveContext, consultez le portable en TFX Keras Component Tutorial .

from my_generator import MyGenerator
from my_consumer import MyConsumer

Construire le InteractiveContext

# Here, we create an InteractiveContext using default parameters. This will
# use a temporary directory with an ephemeral ML Metadata database instance.
# To use your own pipeline root or database, the optional properties
# `pipeline_root` and `metadata_connection_config` may be passed to
# InteractiveContext. Calls to InteractiveContext are no-ops outside of the
# notebook.
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext
context = InteractiveContext()
WARNING:absl:InteractiveContext pipeline_root argument not provided: using temporary directory /tmp/tfx-interactive-2021-12-05T10_37_04.715534-3q0k1y0m as root for pipeline outputs.
WARNING:absl:InteractiveContext metadata_connection_config not provided: using SQLite ML Metadata database at /tmp/tfx-interactive-2021-12-05T10_37_04.715534-3q0k1y0m/metadata.sqlite.

Exécutez votre composant de manière interactive avec context.run()

Ensuite, nous courons nos composants de manière interactive dans le bloc - notes avec context.run() . Notre composant consommateur utilise les sorties du composant générateur.

generator = MyGenerator()
context.run(generator)
WARNING: Logging before InitGoogleLogging() is written to STDERR
I1205 10:37:04.765872 28682 rdbms_metadata_access_object.cc:686] No property is defined for the Type
consumer = MyConsumer(
    data=generator.outputs['data'],
    algorithm='md5')
context.run(consumer)
I1205 10:37:04.808555 28682 rdbms_metadata_access_object.cc:686] No property is defined for the Type

Après l'exécution, nous pouvons inspecter le contenu de l'artefact de sortie "hachage" du composant consommateur sur le disque.

tail -v {consumer.outputs['hash'].get()[0].uri}
==> /tmp/tfx-interactive-2021-12-05T10_37_04.715534-3q0k1y0m/MyConsumer/hash/2/value <==
0015fe7975d1a2794b59aa12635703f1

C'est tout, et vous avez maintenant écrit et exécuté vos propres composants personnalisés !

Écrire une définition de pipeline

Ensuite, nous allons créer un pipeline en utilisant ces mêmes composants. Tout en utilisant le InteractiveContext dans un ordinateur portable fonctionne bien pour l' expérimentation, la définition d' un pipeline vous permet de déployer votre pipeline sur les coureurs locaux ou distants pour l' utilisation de la production.

Ici, nous allons démontrer l'utilisation de LocalDagRunner exécuté localement sur votre machine. Pour l'exécution de la production, les glissières Airflow ou Kubeflow peuvent être plus adaptées.

Construire un pipeline

import os
import tempfile
from tfx import v1 as tfx

# Select a persistent TFX root directory to store your output artifacts.
# For demonstration purposes only, we use a temporary directory.
PIPELINE_ROOT = tempfile.mkdtemp()
# Select a pipeline name so that multiple runs of the same logical pipeline
# can be grouped.
PIPELINE_NAME = "function-based-pipeline"
# We use a ML Metadata configuration that uses a local SQLite database in
# the pipeline root directory. Other backends for ML Metadata are available
# for production usage.
METADATA_CONNECTION_CONFIG = tfx.orchestration.metadata.sqlite_metadata_connection_config(
    os.path.join(PIPELINE_ROOT, 'metadata.sqlite'))

def function_based_pipeline():
  # Here, we construct our generator and consumer components in the same way.
  generator = MyGenerator()
  consumer = MyConsumer(
      data=generator.outputs['data'],
      algorithm='md5')

  return tfx.dsl.Pipeline(
      pipeline_name=PIPELINE_NAME,
      pipeline_root=PIPELINE_ROOT,
      components=[generator, consumer],
      metadata_connection_config=METADATA_CONNECTION_CONFIG)

my_pipeline = function_based_pipeline()

Exécutez votre pipeline avec le LocalDagRunner

tfx.orchestration.LocalDagRunner().run(my_pipeline)
I1205 10:37:04.983860 28682 rdbms_metadata_access_object.cc:686] No property is defined for the Type
I1205 10:37:04.990442 28682 rdbms_metadata_access_object.cc:686] No property is defined for the Type
I1205 10:37:04.996665 28682 rdbms_metadata_access_object.cc:686] No property is defined for the Type
I1205 10:37:05.003470 28682 rdbms_metadata_access_object.cc:686] No property is defined for the Type
I1205 10:37:05.013659 28682 rdbms_metadata_access_object.cc:686] No property is defined for the Type
I1205 10:37:05.031374 28682 rdbms_metadata_access_object.cc:686] No property is defined for the Type
I1205 10:37:05.048280 28682 rdbms_metadata_access_object.cc:686] No property is defined for the Type
I1205 10:37:05.067972 28682 rdbms_metadata_access_object.cc:686] No property is defined for the Type

Nous pouvons inspecter les artefacts de sortie générés par cette exécution de pipeline.

find {PIPELINE_ROOT}
/tmp/tmpydmun02b
/tmp/tmpydmun02b/metadata.sqlite
/tmp/tmpydmun02b/MyConsumer
/tmp/tmpydmun02b/MyConsumer/.system
/tmp/tmpydmun02b/MyConsumer/.system/executor_execution
/tmp/tmpydmun02b/MyConsumer/.system/executor_execution/2
/tmp/tmpydmun02b/MyConsumer/hash
/tmp/tmpydmun02b/MyConsumer/hash/2
/tmp/tmpydmun02b/MyConsumer/hash/2/value
/tmp/tmpydmun02b/MyGenerator
/tmp/tmpydmun02b/MyGenerator/data
/tmp/tmpydmun02b/MyGenerator/data/1
/tmp/tmpydmun02b/MyGenerator/data/1/data_file.txt
/tmp/tmpydmun02b/MyGenerator/.system
/tmp/tmpydmun02b/MyGenerator/.system/executor_execution
/tmp/tmpydmun02b/MyGenerator/.system/executor_execution/1

Vous avez maintenant écrit vos propres composants personnalisés et orchestré leur exécution sur le LocalDagRunner ! Pour les prochaines étapes, consultez des tutoriels et des guides supplémentaires sur le site de TFX .