Missed TensorFlow Dev Summit? Check out the video playlist. Watch recordings

Create a TFX pipeline using templates


This document will provide instructions to create a TensorFlow Extended (TFX) pipeline using templates which are provided with TFX Python package. Most of instructions are Linux shell commands, and corresponding Jupyter Notebook code cells which invoke those commands using ! are provided.

You will build a pipeline using Taxi Trips dataset released by the City of Chicago. We strongly encourage you to try to build your OWN pipeline using your OWN dataset by utilizing this pipeline as a baseline.


You can get all prerequisites easily by launching this notebook on Google Cloud Platform AI Platform Notebook

Step 1. Set up your environment.

You should prepare a development environment to build a pipeline, and a Kubeflow Pipeline cluster to run the newly built pipeline.

1a. Development environment

On your local machine

Install tfx and kfp python packages. kfp is required to use Kubeflow Pipeline(KFP) as an orchestrator engine.

You also need to download skaffold. skaffold is a tool to build docker images easily. A custom docker image will be used when running a pipeline on KFP.

There are a couple of Notebook files in the template, and a Jupyter Notebook kernel with this virtualenv is required to run them.

You can use following shell script snippet to set up your environment.

# Create a virtualenv for tfx.
virtualenv -p python3 venv
source venv/bin/activate
# Install python packages.
pip install -q tfx kfp
# Download skaffold.
curl -Lo skaffold https://storage.googleapis.com/skaffold/releases/latest/skaffold-linux-amd64
chmod +x skaffold
mv skaffold venv/bin/
# Install a Jupyter Notebook kernel for this virtualenv.
python -m ipykernel install --user --name=tfx

On Cloud AI Platform Notebook

If you are using Cloud AI Platform Notebook, create a TensorFlow pre-installed instance for the notebook.

Install tfx, kfp, and skaffold, and add installation path to the PATH environment variable.

NOTE: There might be some errors during package installation. For example, "ERROR: some-package 0.some_version.1 has requirement other-package!=2.0.,<3,>=1.15, but you'll have other-package 2.0.0 which is incompatible." Please ignore these errors at this moment.

TODO(b/149346490): TFX team is preparing a base image which includes tfx, kfp and skaffold by default. You won't have to install packages in this section in the near future.

# Install tfx and kfp Python packages.
!pip3 install --user --upgrade -q tfx
!pip3 install --user --upgrade -q kfp
# Download skaffold and set it executable.
!curl -Lo skaffold https://storage.googleapis.com/skaffold/releases/latest/skaffold-linux-amd64 && chmod +x skaffold && mv skaffold /home/jupyter/.local/bin/
ERROR: Will not install to the user site because it will lack sys.path precedence to grpcio in /tmpfs/src/tf_docs_env/lib/python3.6/site-packages
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 37.3M  100 37.3M    0     0  88.3M      0 --:--:-- --:--:-- --:--:-- 88.3M
mv: cannot move 'skaffold' to '/home/jupyter/.local/bin/': No such file or directory
# Set `PATH` to include user python binary directory and a directory containing `skaffold`.
%env PATH={PATH}:/home/jupyter/.local/bin
env: PATH=/tmpfs/src/tf_docs_env/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/snap/bin:/opt/puppetlabs/bin:/opt/android-studio/current/bin:/usr/local/go/bin:/usr/local/go/packages/bin:/opt/kubernetes/client/bin/:/home/kbuilder/.local/bin:/home/jupyter/.local/bin

Let's check the version of TFX.

python -c "import tfx; print('TFX version: {}'.format(tfx.__version__))"
python3 -c "import tfx; print('TFX version: {}'.format(tfx.__version__))"
Traceback (most recent call last):
  File "<string>", line 1, in <module>
ModuleNotFoundError: No module named 'tfx'

1b. Kubeflow Pipeline cluster

TFX pipeline can be run on Kubernetes using Kubeflow Pipelines. If you don't have one, you can create a Kubeflow Pipeline cluster on GCP. This tutorial assumes that the cluster runs on GCP.

You should be logged in to cloud services to use cloud APIs. If you are using Google Cloud AI Platform Notebook, you are automatically logged in to GCP. Otherwise, you should be logged in using gcloud utility.

Let's set some environment variables to use Kubeflow Pipeline.

First, make sure what your GCP project ID is. If you are using terminal environment, You can find you project ID and set it to an environment variable with following command.

export GCP_PROJECT_ID=$(gcloud config list --format 'value(core.project)' 2>/dev/null)
# Read GCP project id from env.
shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
print("GCP project ID:" + GCP_PROJECT_ID)
GCP project ID:tf-benchmark-dashboard

We also need to access your KFP cluster. You can access it in your Google Cloud Console under "AI Platform > Pipeline" menu. The "endpoint" of the KFP cluster can be found from the URL of the Pipelines dashboard. Let's set the endpoint to ENDPOINT envrionment variable. ENDPOINT should contain only the host part of the URL. For example, if the URL of the KFP dashboard is https://1e9deb537390ca22-dot-asia-east1.pipelines.googleusercontent.com/#/start, ENDPOINT value becomes 1e9deb537390ca22-dot-asia-east1.pipelines.googleusercontent.com.

export ENDPOINT=XXXXXXX.pipelines.googleusercontent.com
Note: You MUST set your ENDPOINT value below.
# This refers to the KFP cluster endpoint
ENDPOINT='' # Enter your ENDPOINT here.
if not ENDPOINT:
    from absl import logging
    logging.error('Set your ENDPOINT in this cell.')
ERROR:absl:Set your ENDPOINT in this cell.

As mentioned above, we will use a custom docker image to run pipeline on KFP. This docker image should be hosted on a docker registry, and we recommend Google Container Registry(gcr.io). Please set CUSTOM_TFX_IMAGE environment variable to an appropriate image name. For example, following command sets the image name as tfx-pipeline under the current GCP project.

export CUSTOM_TFX_IMAGE=gcr.io/${GCP_PROJECT_ID}/tfx-pipeline
# Docker image name for the pipeline image 
CUSTOM_TFX_IMAGE='gcr.io/' + GCP_PROJECT_ID + '/tfx-pipeline'

And, it's done. We are ready to create a pipeline.

Step 2. Copy predefined template to your project directory.

In this step, we will create a working pipeline project by copying from a predefined template.

Please decide a name for the new pipeline and a project directory to put your files in. Let's Define environment variables for these.

export PIPELINE_NAME="my_pipeline"
import os

TFX provides provides taxi template with tfx python package. If you are planning to solve a point-wise prediction problem including classification and regresssion, this template could be used as a starting point.

Use tfx cli to copy predefined template to your project directory.

tfx template copy \
   --pipeline_name="${PIPELINE_NAME}" \
   --destination_path="${PROJECT_DIR}" \
!tfx template copy \
  --pipeline_name={PIPELINE_NAME} \
  --destination_path={PROJECT_DIR} \
/bin/sh: 1: tfx: not found

Change working directory to the project directory which contains generated files.

[Errno 2] No such file or directory: '/home/kbuilder/AIHub/my_pipeline'

If you are using Cloud AI Platform Notebook, Don't forget to change directory in File Browser on the left side of the screen, too.

Step 3. Browse your copied source files.

TFX template provides basic scaffold files to build a pipeline, including python source codes, sample data and Jupyter Notebook files to analysis the output of the pipeline. taxi template uses the same Chicago Taxi dataset and ML model with Tutorial.

Here is brief introduction to each python files.

  • configs.py: defines common constants for pipeline runners.
  • pipeline.py: defines TFX components and a pipeline.
  • beam_dag_runner.py / kubeflow_dag_runner.py: define runners for each orchestration engine.
  • features.py / features_test.py: defines features for the model.
  • hparams.py: defines hyperparameters of the model.
  • preprocessing.py / preprocessing_test.py: defines preprocessing jobs using tf::Transform.
  • model.py / model_test.py: defines DNN model using TF estimator.
skaffold  template_beam.ipynb

You might notice that there are some files with _test.py in their name. They are unit tests of the pipeline and it is recommended to add more unit tests as you implement your model.

You can try to run unit tests simply by supplying test files to python binary.

python features_test.py
python3 features_test.py
python3: can't open file 'features_test.py': [Errno 2] No such file or directory

Step 4. Run your first TFX pipeline

Copied pipeline can be run using tfx cli. In this step, we will create pipelines using two orchestrator engines, Beam and Kubeflow.

4a. Using Beam orchestrator

Apache Beam can be used as an orchestrating engine for the pipeline without additional configuration.

You can create a pipeline using pipeline create command.

tfx pipeline create --engine=beam --pipeline_path=beam_dag_runner.py

Then, you can run the created pipeline using run create command.

tfx run create --engine=beam --pipeline_name="${PIPELINE_NAME}"

If successful, you'll see Component CsvExampleGen is finished. When you copy the template, only one component, CsvExampleGen, is included in the pipeline. Beam orchestrator is useful for local experiments, but a production pipeline usually requires more scalable and stable running environments like, for example, Kubernetes.

4b. Using Kubeflow orchestrator

Components in the TFX pipeline will generate outputs for each run, and they need to be stored somewhere. You can use any storage which the KFP cluster can access, and we will use Google Cloud Storage(GCS) in this document. If you created a KFP cluster in GCP, a default GCS bucket should have been created automatically. It has a name starting with hostedkfp-default-.

To run this pipeline in KFP, you should edit configs.py to set your GCS bucket name. You can see your GCS buckets using gsutil command.

# You can see your buckets using `gsutil`. Following command will show bucket names without prefix and postfix.
!gsutil ls | cut -d / -f 3
gsutil ls

Set GCS_BUCKET_NAME in configs.py without gs:// or /. For example, if gsutil ls displayed gs://my-bucket, you should set my-bucket.

GCS_BUCKET_NAME = 'my-bucket'
Note: You MUST set your GCS bucket name in the `configs.py` file before proceed.

Let's create a pipeline on KFP.

tfx pipeline create  \
--pipeline_path=kubeflow_dag_runner.py \
--endpoint=${ENDPOINT} \
Note: When creating a pipeline for KFP, we need a container image which will be used to run our pipeline. And `skaffold` will build the image for us. Because skaffold pulls base images from the docker hub, it will take 5~10 minutes when we build the image for the first time, but it will take much less time from the second build.
!tfx pipeline create  \
--pipeline_path=kubeflow_dag_runner.py \
--endpoint={ENDPOINT} \
/bin/sh: 1: tfx: not found

While creating a pipeline, Dockerfile and build.yaml will be generated to build a docker image. Don't forget to add these files to the source control system(for example, git) along with other source files.

A pipeline definition file for argo will be generated, too. The name of this file is ${PIPELINE_NAME}.tar.gz. For example, it will be my_pipeline.tar.gz if the name of your pipeline is my_pipeline. It is recommended NOT to include this pipeline definition file into source control. Because it will be generated from other python files and will be updated whenever you update the pipeline. For your convenience, this file is already listed in .gitignore which is generated automatically.

NOTE: kubeflow will be automatically selected as an orchestration engine if airflow is not installed and --engine is not specified.

Then, you can run the created pipeline using run create command.

tfx run create --pipeline_name="${PIPELINE&#95;NAME}" --endpoint=${ENDPOINT}
tfx run create --pipeline_name={PIPELINE_NAME} --endpoint={ENDPOINT}
/bin/sh: 1: tfx: not found

Or, you can run the pipeline on the KFP Dashboard, too.

You can see the run using run list or run status command.

tfx run list --pipeline_name="${PIPELINE_NAME}" --endpoint=${ENDPOINT}

However, we recommend visiting your KFP Dashboard using Web Browser. If you launched your KFP cluster in GCP, you can access KFP Dashboard from the Cloud AI Platform Pipelines menu in Google Cloud Console. Once you visit the dashboard, you will be able to find the pipeline, the run and many more information about the pipeline. For example, you can find your runs under Experiments menu, and you can find all your artifacts from the pipeline under Artifacts menu.

Note: If your pipeline run fails, you can see detailed logs in the KFP Dashboard. One of the major sources of failure is permission related problems. Please make sure your KFP cluster has permissions to access Google Cloud APIs. This can be configured [when you create a KFP cluster in GCP](https://cloud.google.com/ai-platform/pipelines/docs/setting-up), or see [Troubleshooting document in GCP](https://cloud.google.com/ai-platform/pipelines/docs/troubleshooting).

Step 5. Add components for data validation.

In this step, you will add components for data validation including StatisticsGen, SchemaGen, and ExampleValidator. If you are interested in data validation, please see Get started with Tensorflow Data Validation.

Open pipeline.py with an editor. Find and uncomment 3 lines which add StatisticsGen, SchemaGen, and ExampleValidator to the pipeline. (Tip: search TODO(step 5):)

You need to update existing pipeline with modified pipeline definition. Use pipeline update command with tfx cli.

If you are using beam orchestrator,

# Update the pipeline
tfx pipeline update --engine=beam --pipeline_path=beam_dag_runner.py
# You can run the pipeline the same way.
tfx run create --engine beam --pipeline_name "${PIPELINE_NAME}"

If you are using Kubeflow orchestrator,

# Update the pipeline
tfx pipeline update \
--pipeline_path=kubeflow_dag_runner.py \

# You can run the pipeline the same way.
tfx run create --pipeline_name "${PIPELINE_NAME}"
# Update the pipeline
!tfx pipeline update \
--pipeline_path=kubeflow_dag_runner.py \
# You can run the pipeline the same way.
!tfx run create --pipeline_name {PIPELINE_NAME} --endpoint={ENDPOINT}
/bin/sh: 1: tfx: not found
/bin/sh: 1: tfx: not found

Check pipeline outputs

If you are using Beam orchestrator, open data_validation.ipynb with Jupyter Notebook.

For Kubeflow Orchestrator, visit KFP dashboard and you can find pipeline outputs in the page for your pipeline run. Click "Experiments" tab on the left, and "All runs" in the Experiments page. You should be able to find the run with the name of your pipeline.

Step 6. Add components for training.

In this step, you will add components for training and model validation including Transform, Trainer, ModelValidator and Pusher. These components are implementing basic ML model using simple DNN. You can find more details about the model in Tutorial.

Open pipeline.py with an editor. Find and uncomment 4 lines which add Transform, Trainer, ModelValidator and Pusher to the pipeline. (Tip: search TODO(step 6):)

You need to update existing pipeline with modified pipeline definition, again. Updating instruction is the same as Step 5. Please update the pipeline using pipeline update and create a run using run create.

!tfx pipeline update \
--pipeline_path=kubeflow_dag_runner.py \
!tfx run create --pipeline_name {PIPELINE_NAME} --endpoint={ENDPOINT}
/bin/sh: 1: tfx: not found
/bin/sh: 1: tfx: not found

If you are not using Cloud AI Platform Notebook, check the newly trained model with model_analysis.ipynb notebook. TFMA Jupyter extension is required to see the visualization. See instructions in the notebook file.

NOTE: This notebook file doesn't work on Cloud AI Platform Notebook or other JupyterLab environments.

Step 7. (Optional) Try BigQueryExampleGen.

[BigQuery] is a serverless, highly scalable, and cost-effective cloud data warehouse. BigQuery can be used as a source for training examples in TFX. In this step, we will add BigQueryExampleGen to the pipeline.

Open pipeline.py with an editor. Comment out CsvExampleGen and uncomment the line which create an instance of BigQueryExampleGen. You also need to uncomment import statement and query argument of the create_pipeline function.

We need to specify which GCP project to use for BigQuery, and this is done by setting --project in beam_pipeline_args when creating a pipeline. open configs.py and uncomment the definition of GCP_PROJECT_ID, GCP_REGION, BIG_QUERY_BEAM_PIPELINE_ARGS and BIG_QUERY_QUERY. You should replace the project id and the region value in this file.

Note: You MUST set your GCP project ID and region in the configs.py file before proceed.

Lastly, open kubeflow_dag_runner.py (or beam_dag_runner.py if you'll use Beam orchestrator) and uncomment two arguments, query and beam_pipeline_args, for create_pipeline() method.

Now the pipeline is ready to use BigQuery as an example source. Update the pipeline and create a run as we did in step 5 and 6.

!tfx pipeline update \
--pipeline_path=kubeflow_dag_runner.py \
!tfx run create --pipeline_name {PIPELINE_NAME} --endpoint={ENDPOINT}
/bin/sh: 1: tfx: not found
/bin/sh: 1: tfx: not found

Step 8. (Optional) Try Dataflow with KFP.

Several TFX Components uses Apache Beam to implement data-parallel pipelines, and it means that you can distribute data processing workloads using Google Cloud Dataflow. In this step, we will set the Kubeflow orchestrator to use dataflow as a data processing back-end of a Apache Beam.

Open configs.py with an editor, and uncomment the definition of GCP_PROJECT_ID, GCP_REGION, and BEAM_PIPELINE_ARGS. Open kubeflow_dag_runner.py and uncomment beam_pipeline_args. (Comment out current beam_pipeline_args what you added in Step 7.)

Now the pipeline is ready to use Dataflow. Update the pipeline and create a run as we did in step 5 and 6.

!tfx pipeline update \
--pipeline_path=kubeflow_dag_runner.py \
!tfx run create --pipeline_name {PIPELINE_NAME} --endpoint={ENDPOINT}
/bin/sh: 1: tfx: not found
/bin/sh: 1: tfx: not found

You can find your Dataflow jobs in Dataflow in Cloud Console.

Step 9. (Optional) Try Cloud AI Platform Training and Prediction with KFP.

TFX interoperates with serveral managed GCP services, such as Cloud AI Platform for Training and Prediction. You can set your Trainer component to use Cloud AI Platform Training, a managed service for ML training workload. Moreover, when your model is built and ready to be served, you can push your model to Cloud AI Platform Prediction for serving. In this step, we will set our Trainer and Pusher component to use Cloud AI Platform services.

Before editing files, you might have to enable [AI Platform Training & Prediction API] first.

Open configs.py with an editor, and uncomment the definition of GCP_PROJECT_ID, GCP_REGION, GCP_AI_PLATFORM_TRAINING_ARGS and GCP_AI_PLATFORM_SERVING_ARGS. We will use our custom built container image to train a model in Cloud AI Platform Training, so we should set masterConfig.imageUri in GCP_AI_PLATFORM_TRAINING_ARGS to the same value as CUSTOM_TFX_IMAGE above.

Open kubeflow_dag_runner.py and uncomment ai_platform_training_args and ai_platform_serving_args.

Update the pipeline and create a run as we did in step 5 and 6.

!tfx pipeline update \
--pipeline_path=kubeflow_dag_runner.py \
!tfx run create --pipeline_name {PIPELINE_NAME} --endpoint={ENDPOINT}
/bin/sh: 1: tfx: not found
/bin/sh: 1: tfx: not found

You can find your training jobs in Cloud AI Platform Jobs. If your pipeline was completed successfully, you can find your model in Cloud AI Platform Models.

Step 10. Ingest YOUR data to the pipeline.

We made a pipeline for a model using Chicago Taxi dataset. Now it's time to put your data into the pipeline. Your data can be stored anywhere your pipeline can access including GCS, BigQuery. You need to modify the pipeline definition to accomodate your data.

  1. If your data is stored in files, modify DATA_PATH in kubeflow_dag_runner.py or beam_dag_runner.py to the location. If your data is stored in BigQuery, modify BIG_QUERY_QUERY in configs.py to your query statement.
  2. Add features in features.py.
  3. Modify preprocessing.py to transform input data for training.
  4. Modify model.py and hparams.py to describe your ML model.

Please see Trainer component guide for more introduction.

Cleaning up

To clean up all Google Cloud resources used in this project, you can delete the Google Cloud project you used for the tutorial.

Alternatively, you can clean up individual resources by visiting each consoles: