Questions about TFX? Join us at Google I/O!

TFX Python function component tutorial

This notebook contains an examples on how to author and run Python function components within the TFX InteractiveContext and in a locally-orchestrated TFX pipeline.

For more context and information, see the Custom Python function components page on the TFX documentation site.


We will first install TFX and import necessary modules. TFX requires Python 3.

Check the system Python version

import sys
'3.6.9 (default, Jan 26 2021, 15:33:00) \n[GCC 8.4.0]'

Upgrade Pip

To avoid upgrading Pip in a system when running locally, check to make sure that we're running in Colab. Local systems can of course be upgraded separately.

  import colab
  !pip install -q --upgrade pip

Install TFX

pip install -q -U tfx

Did you restart the runtime?

If you are using Google Colab, the first time that you run the cell above, you must restart the runtime (Runtime > Restart runtime ...). This is because of the way that Colab loads packages.

Import packages

We import TFX and check its version.

# Check version
import tfx

Custom Python function components

In this section, we will create components from Python functions. We will notbe doing any real ML problem — these simple functions are just used to illustrate the Python function component development process.

See Python function based component guide for more documentation.

Create Python custom components

We begin by writing a function that generate some dummy data. This is written to its own Python module file.


import os
import tensorflow as tf  # Used for writing files.

from tfx.types.experimental.simple_artifacts import Dataset
from tfx.dsl.component.experimental.decorators import component
from tfx.dsl.component.experimental.annotations import OutputArtifact

def MyGenerator(data: OutputArtifact[Dataset]):
  """Create a file with dummy data in the output artifact."""
  with, 'data_file.txt'), 'w') as f:
    f.write('Dummy data')

  # Set metadata and ensure that it gets passed to downstream components.
  data.set_string_custom_property('my_custom_field', 'my_custom_value')

Next, we write a second component that uses the dummy data produced. We will just calculate hash of the data and return it.


import hashlib
import os
import tensorflow as tf

from tfx.types.experimental.simple_artifacts import Dataset
from tfx.types.standard_artifacts import String
from tfx.dsl.component.experimental.decorators import component
from tfx.dsl.component.experimental.annotations import InputArtifact, OutputArtifact, Parameter

def MyConsumer(data: InputArtifact[Dataset],
               hash: OutputArtifact[String],
               algorithm: Parameter[str] = 'sha256'):
  """Reads the contents of data and calculate."""
      os.path.join(data.uri, 'data_file.txt'), 'r') as f:
    contents =
  h =
  hash.value = h.hexdigest()

  # Read a custom property from the input artifact and set to the output.
  custom_value = data.get_string_custom_property('my_custom_field')
  hash.set_string_custom_property('input_custom_field', custom_value)

Run in-notebook with the InteractiveContext

Now, we will demonstrate usage of our new components in the TFX InteractiveContext.

For more information on what you can do with the TFX notebook InteractiveContext, see the in-notebook TFX Keras Component Tutorial.

from tfx.orchestration.pipeline import Pipeline

from my_generator import MyGenerator
from my_consumer import MyConsumer
WARNING:absl:RuntimeParameter is only supported on Cloud-based DAG runner currently.

Construct the InteractiveContext

# Here, we create an InteractiveContext using default parameters. This will
# use a temporary directory with an ephemeral ML Metadata database instance.
# To use your own pipeline root or database, the optional properties
# `pipeline_root` and `metadata_connection_config` may be passed to
# InteractiveContext. Calls to InteractiveContext are no-ops outside of the
# notebook.
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext
context = InteractiveContext()
WARNING:absl:InteractiveContext pipeline_root argument not provided: using temporary directory /tmp/tfx-interactive-2021-03-23T09_11_08.177410-jmuwsnzs as root for pipeline outputs.
WARNING:absl:InteractiveContext metadata_connection_config not provided: using SQLite ML Metadata database at /tmp/tfx-interactive-2021-03-23T09_11_08.177410-jmuwsnzs/metadata.sqlite.

Run your component interactively with

Next, we run our components interactively within the notebook with Our consumer component uses the outputs of the generator component.

generator = MyGenerator()
consumer = MyConsumer(

After execution, we can inspect the contents of the "hash" output artifact of the consumer component on disk.

tail -v {consumer.outputs['hash'].get()[0].uri}
==> /tmp/tfx-interactive-2021-03-23T09_11_08.177410-jmuwsnzs/MyConsumer/hash/2/value <==

That's it, and you've now written and executed your own custom components!

Write a pipeline definition

Next, we will author a pipeline using these same components. While using the InteractiveContext within a notebook works well for experimentation, defining a pipeline lets you deploy your pipeline on local or remote runners for production usage.

Here, we will demonstrate usage of the LocalDagRunner running locally on your machine. For production execution, the Airflow or Kubeflow runners may be more suitable.

Construct a pipeline

import os
import tempfile
from tfx.orchestration import metadata

# Select a persistent TFX root directory to store your output artifacts.
# For demonstration purposes only, we use a temporary directory.
PIPELINE_ROOT = tempfile.mkdtemp()
# Select a pipeline name so that multiple runs of the same logical pipeline
# can be grouped.
PIPELINE_NAME = "function-based-pipeline"
# We use a ML Metadata configuration that uses a local SQLite database in
# the pipeline root directory. Other backends for ML Metadata are available
# for production usage.
METADATA_CONNECTION_CONFIG = metadata.sqlite_metadata_connection_config(
    os.path.join(PIPELINE_ROOT, 'metadata.sqlite'))

def function_based_pipeline():
  # Here, we construct our generator and consumer components in the same way.
  generator = MyGenerator()
  consumer = MyConsumer(

  return Pipeline(
      components=[generator, consumer],

my_pipeline = function_based_pipeline()

Run your pipeline with the LocalDagRunner

from tfx.orchestration.local.local_dag_runner import LocalDagRunner

WARNING:apache_beam.options.pipeline_options:Discarding unparseable args: ['-f', '/tmp/tmp5_rj_ays.json', '--HistoryManager.hist_file=:memory:']
WARNING:apache_beam.options.pipeline_options:Discarding unparseable args: ['-f', '/tmp/tmp5_rj_ays.json', '--HistoryManager.hist_file=:memory:']

We can inspect the output artifacts generated by this pipeline execution.


You have now written your own custom components and orchestrated their execution on the LocalDagRunner! For next steps, check out additional tutorials and guides on the TFX website.