TFX Python function component tutorial

This notebook contains an examples on how to author and run Python function components within the TFX InteractiveContext and in a locally-orchestrated TFX pipeline.

For more context and information, see the Custom Python function components page on the TFX documentation site.

Setup

We will first install TFX and import necessary modules. TFX requires Python 3.

Check the system Python version

import sys
sys.version
'3.9.17 (main, Jun  6 2023, 20:11:04) \n[GCC 9.4.0]'

Upgrade Pip

To avoid upgrading Pip in a system when running locally, check to make sure that we're running in Colab. Local systems can of course be upgraded separately.

try:
  import colab
  !pip install --upgrade pip
except:
  pass

Install TFX

pip install tfx

Uninstall shapely

TODO(b/263441833) This is a temporal solution to avoid an ImportError. Ultimately, it should be handled by supporting a recent version of Bigquery, instead of uninstalling other extra dependencies.

pip uninstall shapely -y

Did you restart the runtime?

If you are using Google Colab, the first time that you run the cell above, you must restart the runtime (Runtime > Restart runtime ...). This is because of the way that Colab loads packages.

Import packages

We import TFX and check its version.

# Check version
from tfx import v1 as tfx
tfx.__version__
'1.13.0'

Custom Python function components

In this section, we will create components from Python functions. We will not be doing any real ML problem — these simple functions are just used to illustrate the Python function component development process.

See Python function based component guide for more documentation.

Create Python custom components

We begin by writing a function that generate some dummy data. This is written to its own Python module file.

%%writefile my_generator.py

import os
import tensorflow as tf  # Used for writing files.

from tfx import v1 as tfx

# Non-public APIs, just for showcase.
from tfx.types.experimental.simple_artifacts import Dataset

@tfx.dsl.components.component
def MyGenerator(data: tfx.dsl.components.OutputArtifact[Dataset]):
  """Create a file with dummy data in the output artifact."""
  with tf.io.gfile.GFile(os.path.join(data.uri, 'data_file.txt'), 'w') as f:
    f.write('Dummy data')

  # Set metadata and ensure that it gets passed to downstream components.
  data.set_string_custom_property('my_custom_field', 'my_custom_value')
Writing my_generator.py

Next, we write a second component that uses the dummy data produced. We will just calculate hash of the data and return it.

%%writefile my_consumer.py

import hashlib
import os
import tensorflow as tf

from tfx import v1 as tfx

# Non-public APIs, just for showcase.
from tfx.types.experimental.simple_artifacts import Dataset
from tfx.types.standard_artifacts import String

@tfx.dsl.components.component
def MyConsumer(data: tfx.dsl.components.InputArtifact[Dataset],
               hash: tfx.dsl.components.OutputArtifact[String],
               algorithm: tfx.dsl.components.Parameter[str] = 'sha256'):
  """Reads the contents of data and calculate."""
  with tf.io.gfile.GFile(
      os.path.join(data.uri, 'data_file.txt'), 'r') as f:
    contents = f.read()
  h = hashlib.new(algorithm)
  h.update(tf.compat.as_bytes(contents))
  hash.value = h.hexdigest()

  # Read a custom property from the input artifact and set to the output.
  custom_value = data.get_string_custom_property('my_custom_field')
  hash.set_string_custom_property('input_custom_field', custom_value)
Writing my_consumer.py

Run in-notebook with the InteractiveContext

Now, we will demonstrate usage of our new components in the TFX InteractiveContext.

For more information on what you can do with the TFX notebook InteractiveContext, see the in-notebook TFX Keras Component Tutorial.

from my_generator import MyGenerator
from my_consumer import MyConsumer

Construct the InteractiveContext

# Here, we create an InteractiveContext using default parameters. This will
# use a temporary directory with an ephemeral ML Metadata database instance.
# To use your own pipeline root or database, the optional properties
# `pipeline_root` and `metadata_connection_config` may be passed to
# InteractiveContext. Calls to InteractiveContext are no-ops outside of the
# notebook.
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext
context = InteractiveContext()
WARNING:absl:InteractiveContext pipeline_root argument not provided: using temporary directory /tmpfs/tmp/tfx-interactive-2023-07-28T10_25_07.019368-ljo_tv2_ as root for pipeline outputs.
WARNING:absl:InteractiveContext metadata_connection_config not provided: using SQLite ML Metadata database at /tmpfs/tmp/tfx-interactive-2023-07-28T10_25_07.019368-ljo_tv2_/metadata.sqlite.

Run your component interactively with context.run()

Next, we run our components interactively within the notebook with context.run(). Our consumer component uses the outputs of the generator component.

generator = MyGenerator()
context.run(generator)
consumer = MyConsumer(
    data=generator.outputs['data'],
    algorithm='md5')
context.run(consumer)

After execution, we can inspect the contents of the "hash" output artifact of the consumer component on disk.

tail -v {consumer.outputs['hash'].get()[0].uri}
==> /tmpfs/tmp/tfx-interactive-2023-07-28T10_25_07.019368-ljo_tv2_/MyConsumer/hash/2/value <==
0015fe7975d1a2794b59aa12635703f1

That's it, and you've now written and executed your own custom components!

Write a pipeline definition

Next, we will author a pipeline using these same components. While using the InteractiveContext within a notebook works well for experimentation, defining a pipeline lets you deploy your pipeline on local or remote runners for production usage.

Here, we will demonstrate usage of the LocalDagRunner running locally on your machine. For production execution, the Airflow or Kubeflow runners may be more suitable.

Construct a pipeline

import os
import tempfile
from tfx import v1 as tfx

# Select a persistent TFX root directory to store your output artifacts.
# For demonstration purposes only, we use a temporary directory.
PIPELINE_ROOT = tempfile.mkdtemp()
# Select a pipeline name so that multiple runs of the same logical pipeline
# can be grouped.
PIPELINE_NAME = "function-based-pipeline"
# We use a ML Metadata configuration that uses a local SQLite database in
# the pipeline root directory. Other backends for ML Metadata are available
# for production usage.
METADATA_CONNECTION_CONFIG = tfx.orchestration.metadata.sqlite_metadata_connection_config(
    os.path.join(PIPELINE_ROOT, 'metadata.sqlite'))

def function_based_pipeline():
  # Here, we construct our generator and consumer components in the same way.
  generator = MyGenerator()
  consumer = MyConsumer(
      data=generator.outputs['data'],
      algorithm='md5')

  return tfx.dsl.Pipeline(
      pipeline_name=PIPELINE_NAME,
      pipeline_root=PIPELINE_ROOT,
      components=[generator, consumer],
      metadata_connection_config=METADATA_CONNECTION_CONFIG)

my_pipeline = function_based_pipeline()

Run your pipeline with the LocalDagRunner

tfx.orchestration.LocalDagRunner().run(my_pipeline)
WARNING:absl:ArtifactQuery.property_predicate is not supported.

We can inspect the output artifacts generated by this pipeline execution.

find {PIPELINE_ROOT}
/tmpfs/tmp/tmp7icf5apc
/tmpfs/tmp/tmp7icf5apc/MyGenerator
/tmpfs/tmp/tmp7icf5apc/MyGenerator/data
/tmpfs/tmp/tmp7icf5apc/MyGenerator/data/1
/tmpfs/tmp/tmp7icf5apc/MyGenerator/data/1/data_file.txt
/tmpfs/tmp/tmp7icf5apc/MyGenerator/.system
/tmpfs/tmp/tmp7icf5apc/MyGenerator/.system/executor_execution
/tmpfs/tmp/tmp7icf5apc/MyGenerator/.system/executor_execution/1
/tmpfs/tmp/tmp7icf5apc/MyConsumer
/tmpfs/tmp/tmp7icf5apc/MyConsumer/hash
/tmpfs/tmp/tmp7icf5apc/MyConsumer/hash/2
/tmpfs/tmp/tmp7icf5apc/MyConsumer/hash/2/value
/tmpfs/tmp/tmp7icf5apc/MyConsumer/.system
/tmpfs/tmp/tmp7icf5apc/MyConsumer/.system/executor_execution
/tmpfs/tmp/tmp7icf5apc/MyConsumer/.system/executor_execution/2
/tmpfs/tmp/tmp7icf5apc/metadata.sqlite

You have now written your own custom components and orchestrated their execution on the LocalDagRunner! For next steps, check out additional tutorials and guides on the TFX website.