This notebook contains an examples on how to author and run Python function components within the TFX InteractiveContext and in a locally-orchestrated TFX pipeline.
For more context and information, see the Custom Python function components page on the TFX documentation site.
Setup
We will first install TFX and import necessary modules. TFX requires Python 3.
Check the system Python version
import sys
sys.version
'3.9.19 (main, Apr 6 2024, 17:57:55) \n[GCC 9.4.0]'
Upgrade Pip
To avoid upgrading Pip in a system when running locally, check to make sure that we're running in Colab. Local systems can of course be upgraded separately.
try:
import colab
!pip install --upgrade pip
except:
pass
Install TFX
pip install tfx
Did you restart the runtime?
If you are using Google Colab, the first time that you run the cell above, you must restart the runtime (Runtime > Restart runtime ...). This is because of the way that Colab loads packages.
Import packages
We import TFX and check its version.
# Check version
from tfx import v1 as tfx
tfx.__version__
2024-05-08 09:54:22.424419: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered 2024-05-08 09:54:22.424465: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered 2024-05-08 09:54:22.426007: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered '1.15.0'
Custom Python function components
In this section, we will create components from Python functions. We will not be doing any real ML problem — these simple functions are just used to illustrate the Python function component development process.
See Python function based component guide for more documentation.
Create Python custom components
We begin by writing a function that generate some dummy data. This is written to its own Python module file.
%%writefile my_generator.py
import os
import tensorflow as tf # Used for writing files.
from tfx import v1 as tfx
# Non-public APIs, just for showcase.
from tfx.types.experimental.simple_artifacts import Dataset
@tfx.dsl.components.component
def MyGenerator(data: tfx.dsl.components.OutputArtifact[Dataset]):
"""Create a file with dummy data in the output artifact."""
with tf.io.gfile.GFile(os.path.join(data.uri, 'data_file.txt'), 'w') as f:
f.write('Dummy data')
# Set metadata and ensure that it gets passed to downstream components.
data.set_string_custom_property('my_custom_field', 'my_custom_value')
Writing my_generator.py
Next, we write a second component that uses the dummy data produced. We will just calculate hash of the data and return it.
%%writefile my_consumer.py
import hashlib
import os
import tensorflow as tf
from tfx import v1 as tfx
# Non-public APIs, just for showcase.
from tfx.types.experimental.simple_artifacts import Dataset
from tfx.types.standard_artifacts import String
@tfx.dsl.components.component
def MyConsumer(data: tfx.dsl.components.InputArtifact[Dataset],
hash: tfx.dsl.components.OutputArtifact[String],
algorithm: tfx.dsl.components.Parameter[str] = 'sha256'):
"""Reads the contents of data and calculate."""
with tf.io.gfile.GFile(
os.path.join(data.uri, 'data_file.txt'), 'r') as f:
contents = f.read()
h = hashlib.new(algorithm)
h.update(tf.compat.as_bytes(contents))
hash.value = h.hexdigest()
# Read a custom property from the input artifact and set to the output.
custom_value = data.get_string_custom_property('my_custom_field')
hash.set_string_custom_property('input_custom_field', custom_value)
Writing my_consumer.py
Run in-notebook with the InteractiveContext
Now, we will demonstrate usage of our new components in the TFX InteractiveContext.
For more information on what you can do with the TFX notebook InteractiveContext, see the in-notebook TFX Keras Component Tutorial.
from my_generator import MyGenerator
from my_consumer import MyConsumer
Construct the InteractiveContext
# Here, we create an InteractiveContext using default parameters. This will
# use a temporary directory with an ephemeral ML Metadata database instance.
# To use your own pipeline root or database, the optional properties
# `pipeline_root` and `metadata_connection_config` may be passed to
# InteractiveContext. Calls to InteractiveContext are no-ops outside of the
# notebook.
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext
context = InteractiveContext()
WARNING:absl:InteractiveContext pipeline_root argument not provided: using temporary directory /tmpfs/tmp/tfx-interactive-2024-05-08T09_54_27.937649-5yvglrqg as root for pipeline outputs. WARNING:absl:InteractiveContext metadata_connection_config not provided: using SQLite ML Metadata database at /tmpfs/tmp/tfx-interactive-2024-05-08T09_54_27.937649-5yvglrqg/metadata.sqlite.
Run your component interactively with context.run()
Next, we run our components interactively within the notebook with
context.run()
. Our consumer component uses the outputs of the generator
component.
generator = MyGenerator()
context.run(generator)
consumer = MyConsumer(
data=generator.outputs['data'],
algorithm='md5')
context.run(consumer)
After execution, we can inspect the contents of the "hash" output artifact of the consumer component on disk.
tail -v {consumer.outputs['hash'].get()[0].uri}
==> /tmpfs/tmp/tfx-interactive-2024-05-08T09_54_27.937649-5yvglrqg/MyConsumer/hash/2/value <== 0015fe7975d1a2794b59aa12635703f1
That's it, and you've now written and executed your own custom components!
Write a pipeline definition
Next, we will author a pipeline using these same components. While using the
InteractiveContext
within a notebook works well for experimentation, defining
a pipeline lets you deploy your pipeline on local or remote runners for
production usage.
Here, we will demonstrate usage of the LocalDagRunner running locally on your machine. For production execution, the Airflow or Kubeflow runners may be more suitable.
Construct a pipeline
import os
import tempfile
from tfx import v1 as tfx
# Select a persistent TFX root directory to store your output artifacts.
# For demonstration purposes only, we use a temporary directory.
PIPELINE_ROOT = tempfile.mkdtemp()
# Select a pipeline name so that multiple runs of the same logical pipeline
# can be grouped.
PIPELINE_NAME = "function-based-pipeline"
# We use a ML Metadata configuration that uses a local SQLite database in
# the pipeline root directory. Other backends for ML Metadata are available
# for production usage.
METADATA_CONNECTION_CONFIG = tfx.orchestration.metadata.sqlite_metadata_connection_config(
os.path.join(PIPELINE_ROOT, 'metadata.sqlite'))
def function_based_pipeline():
# Here, we construct our generator and consumer components in the same way.
generator = MyGenerator()
consumer = MyConsumer(
data=generator.outputs['data'],
algorithm='md5')
return tfx.dsl.Pipeline(
pipeline_name=PIPELINE_NAME,
pipeline_root=PIPELINE_ROOT,
components=[generator, consumer],
metadata_connection_config=METADATA_CONNECTION_CONFIG)
my_pipeline = function_based_pipeline()
Run your pipeline with the LocalDagRunner
tfx.orchestration.LocalDagRunner().run(my_pipeline)
WARNING:absl:ArtifactQuery.property_predicate is not supported.
We can inspect the output artifacts generated by this pipeline execution.
find {PIPELINE_ROOT}
/tmpfs/tmp/tmpcu4s98j0 /tmpfs/tmp/tmpcu4s98j0/MyGenerator /tmpfs/tmp/tmpcu4s98j0/MyGenerator/data /tmpfs/tmp/tmpcu4s98j0/MyGenerator/data/1 /tmpfs/tmp/tmpcu4s98j0/MyGenerator/data/1/data_file.txt /tmpfs/tmp/tmpcu4s98j0/MyGenerator/.system /tmpfs/tmp/tmpcu4s98j0/MyGenerator/.system/stateful_working_dir /tmpfs/tmp/tmpcu4s98j0/MyGenerator/.system/executor_execution /tmpfs/tmp/tmpcu4s98j0/MyGenerator/.system/executor_execution/1 /tmpfs/tmp/tmpcu4s98j0/metadata.sqlite /tmpfs/tmp/tmpcu4s98j0/MyConsumer /tmpfs/tmp/tmpcu4s98j0/MyConsumer/.system /tmpfs/tmp/tmpcu4s98j0/MyConsumer/.system/stateful_working_dir /tmpfs/tmp/tmpcu4s98j0/MyConsumer/.system/executor_execution /tmpfs/tmp/tmpcu4s98j0/MyConsumer/.system/executor_execution/2 /tmpfs/tmp/tmpcu4s98j0/MyConsumer/hash /tmpfs/tmp/tmpcu4s98j0/MyConsumer/hash/2 /tmpfs/tmp/tmpcu4s98j0/MyConsumer/hash/2/value
You have now written your own custom components and orchestrated their execution on the LocalDagRunner! For next steps, check out additional tutorials and guides on the TFX website.