Introduction
This document will provide instructions to create a TensorFlow Extended (TFX) pipeline for your own dataset using penguin template which is provided with TFX Python package. Created pipeline will be using Palmer Penguins dataset initially, but we will transform the pipeline for your dataset.
Prerequisites
- Linux / MacOS
- Python 3.6-3.8
- Jupyter notebook
Step 1. Copy the predefined template to your project directory.
In this step, we will create a working pipeline project directory and files by copying files from penguin template in TFX. You can think of this as a scaffold for your TFX pipeline project.
Update Pip
If we're running in Colab then we should make sure that we have the latest version of Pip. Local systems can of course be updated separately.
import sys
if 'google.colab' in sys.modules:
!pip install --upgrade pip
Install required package
First, install TFX and TensorFlow Model Analysis (TFMA).
pip install -U tfx tensorflow-model-analysis
Let's check the versions of TFX.
import tensorflow as tf
import tensorflow_model_analysis as tfma
import tfx
print('TF version: {}'.format(tf.__version__))
print('TFMA version: {}'.format(tfma.__version__))
print('TFX version: {}'.format(tfx.__version__))
2024-04-30 10:26:21.478406: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered 2024-04-30 10:26:21.478458: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered 2024-04-30 10:26:21.480153: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered TF version: 2.15.1 TFMA version: 0.46.0 TFX version: 1.15.0
We are ready to create a pipeline.
Set PROJECT_DIR
to appropriate destination for your environment.
Default value is ~/imported/${PIPELINE_NAME}
which is appropriate for
Google Cloud AI Platform Notebook
environment.
You may give your pipeline a different name by changing the PIPELINE_NAME
below. This will also become the name of the project directory where your
files will be put.
PIPELINE_NAME="my_pipeline"
import os
# Set this project directory to your new tfx pipeline project.
PROJECT_DIR=os.path.join(os.path.expanduser("~"), "imported", PIPELINE_NAME)
Copy template files.
TFX includes the penguin
template with the TFX python package.
penguin
template
contains many instructions to bring your dataset into the pipeline which is
the purpose of this tutorial.
The tfx template copy
CLI command copies predefined template files into your
project directory.
# Set `PATH` to include user python binary directory and a directory containing `skaffold`.
PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin
!tfx template copy \
--pipeline-name={PIPELINE_NAME} \
--destination-path={PROJECT_DIR} \
--model=penguin
env: PATH=/tmpfs/src/tf_docs_env/bin:/usr/local/cuda/bin:/opt/android-sdk/current/cmdline-tools/tools/bin:/opt/android-sdk/current/bin:/usr/local/go/bin:/usr/local/go/packages/bin:/opt/kubernetes/client/bin:/usr/local/cuda/bin:/opt/android-sdk/current/cmdline-tools/tools/bin:/opt/android-sdk/current/bin:/usr/local/go/bin:/usr/local/go/packages/bin:/opt/kubernetes/client/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/snap/bin:/home/kbuilder/.local/bin:/home/jupyter/.local/bin 2024-04-30 10:26:25.552132: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered 2024-04-30 10:26:25.552181: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered 2024-04-30 10:26:25.553635: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered CLI Copying penguin pipeline template kubeflow_runner.py -> /home/kbuilder/imported/my_pipeline/kubeflow_runner.py __init__.py -> /home/kbuilder/imported/my_pipeline/__init__.py preprocessing.py -> /home/kbuilder/imported/my_pipeline/models/preprocessing.py preprocessing_test.py -> /home/kbuilder/imported/my_pipeline/models/preprocessing_test.py model.py -> /home/kbuilder/imported/my_pipeline/models/model.py __init__.py -> /home/kbuilder/imported/my_pipeline/models/__init__.py constants.py -> /home/kbuilder/imported/my_pipeline/models/constants.py features_test.py -> /home/kbuilder/imported/my_pipeline/models/features_test.py features.py -> /home/kbuilder/imported/my_pipeline/models/features.py model_test.py -> /home/kbuilder/imported/my_pipeline/models/model_test.py configs.py -> /home/kbuilder/imported/my_pipeline/pipeline/configs.py __init__.py -> /home/kbuilder/imported/my_pipeline/pipeline/__init__.py pipeline.py -> /home/kbuilder/imported/my_pipeline/pipeline/pipeline.py local_runner.py -> /home/kbuilder/imported/my_pipeline/local_runner.py
Change the working directory context in this notebook to the project directory.
%cd {PROJECT_DIR}
/home/kbuilder/imported/my_pipeline
Browse your copied source files
The TFX template provides basic scaffold files to build a pipeline,
including Python source code and sample data. The penguin
template uses the
same Palmer Penguins dataset and ML model as the
Penguin example.
Here is brief introduction to each of the Python files.
pipeline
- This directory contains the definition of the pipelineconfigs.py
— defines common constants for pipeline runnerspipeline.py
— defines TFX components and a pipeline
models
- This directory contains ML model definitionsfeatures.py
,features_test.py
— defines features for the modelpreprocessing.py
,preprocessing_test.py
— defines preprocessing routines for dataconstants.py
— defines constants of the modelmodel.py
,model_test.py
— defines ML model using ML frameworks like TensorFlow
local_runner.py
— define a runner for local environment which uses local orchestration enginekubeflow_runner.py
— define a runner for Kubeflow Pipelines orchestration engine
By default, the template only includes standard TFX components. If you need some customized actions, you can create custom components for your pipeline. Please see TFX custom component guide for the detail.
Unit-test files.
You might notice that there are some files with _test.py
in their name.
These are unit tests of the pipeline and it is recommended to add more unit
tests as you implement your own pipelines.
You can run unit tests by supplying the module name of test files with -m
flag. You can usually get a module name by deleting .py
extension and
replacing /
with .
. For example:
import sys
!{sys.executable} -m models.features_test
2024-04-30 10:26:29.771379: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered 2024-04-30 10:26:29.771428: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered 2024-04-30 10:26:29.772789: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered Running tests under Python 3.9.19: /tmpfs/src/tf_docs_env/bin/python [ RUN ] FeaturesTest.testLabelKey INFO:tensorflow:time(__main__.FeaturesTest.testLabelKey): 0.0s I0430 10:26:32.408985 139818587490112 test_util.py:2574] time(__main__.FeaturesTest.testLabelKey): 0.0s [ OK ] FeaturesTest.testLabelKey [ RUN ] FeaturesTest.test_session [ SKIPPED ] FeaturesTest.test_session ---------------------------------------------------------------------- Ran 2 tests in 0.001s OK (skipped=1)
Create a TFX pipeline in local environment.
TFX supports several orchestration engines to run pipelines. We will use local orchestration engine. Local orchestration engine runs without any further dependencies, and it is suitable for development and debugging because it runs on local environment rather than depends on remote computing clusters.
We will use local_runner.py
to run your pipeline using local
orchestrator. You have to create a pipeline before running it. You can create
a pipeline with pipeline create
command.
tfx pipeline create --engine=local --pipeline_path=local_runner.py
2024-04-30 10:26:33.771789: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered 2024-04-30 10:26:33.771844: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered 2024-04-30 10:26:33.773251: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered CLI Creating pipeline INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Excluding no splits because exclude_splits is not set. Pipeline "my_pipeline" created successfully.
pipeline create
command registers your pipeline defined in local_runner.py
without actually running it.
You will run the created pipeline with run create
command in following steps.
Step 2. Ingest YOUR data to the pipeline.
The initial pipeline ingests the penguin dataset which is included in the template. You need to put your data into the pipeline, and most TFX pipelines start with ExampleGen component.
Choose an ExampleGen
Your data can be stored anywhere your pipeline can access, on either a local or distributed filesystem, or a query-able system. TFX provides various
ExampleGen
components
to bring your data into a TFX pipeline. You can choose one from following
example generating components.
- CsvExampleGen: Reads CSV files in a directory. Used in penguin example and Chicago taxi example.
- ImportExampleGen: Takes TFRecord files with TF Example data format. Used in MNIST examples.
- FileBasedExampleGen for Avro or Parquet format.
- BigQueryExampleGen: Reads data in Google Cloud BigQuery directly. Used in Chicago taxi examples.
You can also create your own ExampleGen, for example, tfx includes a custom ExecampleGen which uses Presto as a data source. See the guide for more information on how to use and develop custom executors.
Once you decide which ExampleGen to use, you will need to modify the pipeline definition to use your data.
Modify the
DATA_PATH
inlocal_runner.py
and set it to the location of your files.- If you have files in local environment, specify the path. This is the best option for developing or debugging a pipeline.
- If the files are stored in GCS, you can use a path starting with
gs://{bucket_name}/...
. Please make sure that you can access GCS from your terminal, for example, usinggsutil
. Please follow authorization guide in Google Cloud if needed. - If you want to use a Query-based ExampleGen like BigQueryExampleGen, you need a Query statement to select data from the data source. There are a few more things you need to set to use Google Cloud BigQuery as a data source.
- In
pipeline/configs.py
:- Change
GOOGLE_CLOUD_PROJECT
andGCS_BUCKET_NAME
to your GCP project and bucket name. The bucket should exist before we run the pipeline. - Uncomment
BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS
variable. - Uncomment and set
BIG_QUERY_QUERY
variable to your query statement.
- Change
- In
local_runner.py
:- Comment out
data_path
argument and uncommentquery
argument instead inpipeline.create_pipeline()
.
- Comment out
- In
pipeline/pipeline.py
:- Comment out
data_path
argument and uncommentquery
argument increate_pipeline()
. - Use BigQueryExampleGen instead of CsvExampleGen.
- Comment out
Replace existing CsvExampleGen to your ExampleGen class in
pipeline/pipeline.py
. Each ExampleGen class has different signature. Please see ExampleGen component guide for more detail. Don't forget to import required modules withimport
statements inpipeline/pipeline.py
.
The initial pipeline is consist of four components, ExampleGen
,
StatisticsGen
, SchemaGen
and ExampleValidator
. We don't need to change
anything for StatisticsGen
, SchemaGen
and ExampleValidator
. Let's run the
pipeline for the first time.
# Update and run the pipeline.
!tfx pipeline update --engine=local --pipeline_path=local_runner.py \
&& tfx run create --engine=local --pipeline_name={PIPELINE_NAME}
2024-04-30 10:26:41.921883: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered 2024-04-30 10:26:41.921929: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered 2024-04-30 10:26:41.923320: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered CLI Updating pipeline INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Excluding no splits because exclude_splits is not set. Pipeline "my_pipeline" updated successfully. 2024-04-30 10:26:49.944802: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered 2024-04-30 10:26:49.944849: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered 2024-04-30 10:26:49.946315: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered CLI Creating a run for pipeline: my_pipeline 2024-04-30 10:26:55.445706: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered 2024-04-30 10:26:55.445751: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered 2024-04-30 10:26:55.447294: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Using deployment config: executor_specs { key: "CsvExampleGen" value { beam_executable_spec { python_executor_spec { class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor" } } } } executor_specs { key: "SchemaGen" value { python_class_executable_spec { class_path: "tfx.components.schema_gen.executor.Executor" } } } executor_specs { key: "StatisticsGen" value { beam_executable_spec { python_executor_spec { class_path: "tfx.components.statistics_gen.executor.Executor" } } } } custom_driver_specs { key: "CsvExampleGen" value { python_class_executable_spec { class_path: "tfx.components.example_gen.driver.FileBasedDriver" } } } metadata_connection_config { database_connection_config { sqlite { filename_uri: "./tfx_metadata/my_pipeline/metadata.db" connection_mode: READWRITE_OPENCREATE } } } INFO:absl:Using connection config: sqlite { filename_uri: "./tfx_metadata/my_pipeline/metadata.db" connection_mode: READWRITE_OPENCREATE } INFO:absl:Component CsvExampleGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen" } id: "CsvExampleGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2024-04-30T10:27:00.897615" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } } outputs { outputs { key: "examples" value { artifact_spec { type { name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET } } } } } parameters { parameters { key: "input_base" value { field_value { string_value: "/home/kbuilder/imported/my_pipeline/data" } } } parameters { key: "input_config" value { field_value { string_value: "{\n \"splits\": [\n {\n \"name\": \"single_split\",\n \"pattern\": \"*\"\n }\n ]\n}" } } } parameters { key: "output_config" value { field_value { string_value: "{\n \"split_config\": {\n \"splits\": [\n {\n \"hash_buckets\": 2,\n \"name\": \"train\"\n },\n {\n \"hash_buckets\": 1,\n \"name\": \"eval\"\n }\n ]\n }\n}" } } } parameters { key: "output_data_format" value { field_value { int_value: 6 } } } parameters { key: "output_file_format" value { field_value { int_value: 5 } } } } downstream_nodes: "StatisticsGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized INFO:absl:[CsvExampleGen] Resolved inputs: ({},) INFO:absl:select span and version = (0, None) INFO:absl:latest span and version = (0, None) INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 1 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=1, input_dict={}, output_dict=defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/1" custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1714472788,sum_checksum:1714472788" } } custom_properties { key: "span" value { int_value: 0 } } , artifact_type: name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}), exec_properties={'output_data_format': 6, 'output_file_format': 5, 'input_config': '{\n "splits": [\n {\n "name": "single_split",\n "pattern": "*"\n }\n ]\n}', 'output_config': '{\n "split_config": {\n "splits": [\n {\n "hash_buckets": 2,\n "name": "train"\n },\n {\n "hash_buckets": 1,\n "name": "eval"\n }\n ]\n }\n}', 'input_base': '/home/kbuilder/imported/my_pipeline/data', 'span': 0, 'version': None, 'input_fingerprint': 'split:single_split,num_files:1,total_bytes:25648,xor_checksum:1714472788,sum_checksum:1714472788'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/1/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/stateful_working_dir/a0cf15c4-941d-446e-8b9b-c35ccd2265a7', tmp_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/1/.temp/', pipeline_node=node_info { type { name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen" } id: "CsvExampleGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2024-04-30T10:27:00.897615" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } } outputs { outputs { key: "examples" value { artifact_spec { type { name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET } } } } } parameters { parameters { key: "input_base" value { field_value { string_value: "/home/kbuilder/imported/my_pipeline/data" } } } parameters { key: "input_config" value { field_value { string_value: "{\n \"splits\": [\n {\n \"name\": \"single_split\",\n \"pattern\": \"*\"\n }\n ]\n}" } } } parameters { key: "output_config" value { field_value { string_value: "{\n \"split_config\": {\n \"splits\": [\n {\n \"hash_buckets\": 2,\n \"name\": \"train\"\n },\n {\n \"hash_buckets\": 1,\n \"name\": \"eval\"\n }\n ]\n }\n}" } } } parameters { key: "output_data_format" value { field_value { int_value: 6 } } } parameters { key: "output_file_format" value { field_value { int_value: 5 } } } } downstream_nodes: "StatisticsGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2024-04-30T10:27:00.897615', top_level_pipeline_run_id=None, frontend_url=None) INFO:absl:Generating examples. INFO:absl:Processing input csv data /home/kbuilder/imported/my_pipeline/data/* to TFExample. WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be. INFO:absl:Examples generated. INFO:absl:Value type <class 'NoneType'> of key version in exec_properties is not supported, going to drop it INFO:absl:Value type <class 'list'> of key _beam_pipeline_args in exec_properties is not supported, going to drop it INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 1 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Deleted stateful_working_dir ./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/stateful_working_dir/a0cf15c4-941d-446e-8b9b-c35ccd2265a7 INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/1" custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1714472788,sum_checksum:1714472788" } } custom_properties { key: "span" value { int_value: 0 } } , artifact_type: name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}) for execution 1 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component CsvExampleGen is finished. INFO:absl:Component StatisticsGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.statistics_gen.component.StatisticsGen" base_type: PROCESS } id: "StatisticsGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2024-04-30T10:27:00.897615" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } } inputs { inputs { key: "examples" value { channels { producer_node_query { id: "CsvExampleGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2024-04-30T10:27:00.897615" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } artifact_query { type { name: "Examples" base_type: DATASET } } output_key: "examples" } min_count: 1 } } } outputs { outputs { key: "statistics" value { artifact_spec { type { name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } } upstream_nodes: "CsvExampleGen" downstream_nodes: "SchemaGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized WARNING:absl:ArtifactQuery.property_predicate is not supported. INFO:absl:[StatisticsGen] Resolved inputs: ({'examples': [Artifact(artifact: id: 1 type_id: 15 uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/1" properties { key: "split_names" value { string_value: "[\"train\", \"eval\"]" } } custom_properties { key: "file_format" value { string_value: "tfrecords_gzip" } } custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1714472788,sum_checksum:1714472788" } } custom_properties { key: "is_external" value { int_value: 0 } } custom_properties { key: "payload_format" value { string_value: "FORMAT_TF_EXAMPLE" } } custom_properties { key: "span" value { int_value: 0 } } custom_properties { key: "tfx_version" value { string_value: "1.15.0" } } state: LIVE type: "Examples" create_time_since_epoch: 1714472821816 last_update_time_since_epoch: 1714472821816 , artifact_type: id: 15 name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]},) INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 2 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=2, input_dict={'examples': [Artifact(artifact: id: 1 type_id: 15 uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/1" properties { key: "split_names" value { string_value: "[\"train\", \"eval\"]" } } custom_properties { key: "file_format" value { string_value: "tfrecords_gzip" } } custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1714472788,sum_checksum:1714472788" } } custom_properties { key: "is_external" value { int_value: 0 } } custom_properties { key: "payload_format" value { string_value: "FORMAT_TF_EXAMPLE" } } custom_properties { key: "span" value { int_value: 0 } } custom_properties { key: "tfx_version" value { string_value: "1.15.0" } } state: LIVE type: "Examples" create_time_since_epoch: 1714472821816 last_update_time_since_epoch: 1714472821816 , artifact_type: id: 15 name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}, output_dict=defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/2" , artifact_type: name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}), exec_properties={'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/2/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/stateful_working_dir/5b69dd77-cc39-437f-b558-ac65633a3f4b', tmp_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/2/.temp/', pipeline_node=node_info { type { name: "tfx.components.statistics_gen.component.StatisticsGen" base_type: PROCESS } id: "StatisticsGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2024-04-30T10:27:00.897615" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } } inputs { inputs { key: "examples" value { channels { producer_node_query { id: "CsvExampleGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2024-04-30T10:27:00.897615" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } artifact_query { type { name: "Examples" base_type: DATASET } } output_key: "examples" } min_count: 1 } } } outputs { outputs { key: "statistics" value { artifact_spec { type { name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } } upstream_nodes: "CsvExampleGen" downstream_nodes: "SchemaGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2024-04-30T10:27:00.897615', top_level_pipeline_run_id=None, frontend_url=None) INFO:absl:Generating statistics for split train. INFO:absl:Statistics for split train written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/2/Split-train. INFO:absl:Generating statistics for split eval. INFO:absl:Statistics for split eval written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/2/Split-eval. INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 2 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Deleted stateful_working_dir ./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/stateful_working_dir/5b69dd77-cc39-437f-b558-ac65633a3f4b INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/2" , artifact_type: name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}) for execution 2 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component StatisticsGen is finished. INFO:absl:Component SchemaGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.schema_gen.component.SchemaGen" base_type: PROCESS } id: "SchemaGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2024-04-30T10:27:00.897615" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.SchemaGen" } } } } inputs { inputs { key: "statistics" value { channels { producer_node_query { id: "StatisticsGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2024-04-30T10:27:00.897615" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } artifact_query { type { name: "ExampleStatistics" base_type: STATISTICS } } output_key: "statistics" } min_count: 1 } } } outputs { outputs { key: "schema" value { artifact_spec { type { name: "Schema" } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } parameters { key: "infer_feature_shape" value { field_value { int_value: 1 } } } } upstream_nodes: "StatisticsGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized WARNING:absl:ArtifactQuery.property_predicate is not supported. INFO:absl:[SchemaGen] Resolved inputs: ({'statistics': [Artifact(artifact: id: 2 type_id: 17 uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/2" properties { key: "span" value { int_value: 0 } } properties { key: "split_names" value { string_value: "[\"train\", \"eval\"]" } } custom_properties { key: "is_external" value { int_value: 0 } } custom_properties { key: "stats_dashboard_link" value { string_value: "" } } custom_properties { key: "tfx_version" value { string_value: "1.15.0" } } state: LIVE type: "ExampleStatistics" create_time_since_epoch: 1714472824520 last_update_time_since_epoch: 1714472824520 , artifact_type: id: 17 name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]},) INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 3 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=3, input_dict={'statistics': [Artifact(artifact: id: 2 type_id: 17 uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/2" properties { key: "span" value { int_value: 0 } } properties { key: "split_names" value { string_value: "[\"train\", \"eval\"]" } } custom_properties { key: "is_external" value { int_value: 0 } } custom_properties { key: "stats_dashboard_link" value { string_value: "" } } custom_properties { key: "tfx_version" value { string_value: "1.15.0" } } state: LIVE type: "ExampleStatistics" create_time_since_epoch: 1714472824520 last_update_time_since_epoch: 1714472824520 , artifact_type: id: 17 name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}, output_dict=defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/3" , artifact_type: name: "Schema" )]}), exec_properties={'infer_feature_shape': 1, 'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/3/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/stateful_working_dir/3645251e-2e5d-4be6-8c74-f4e0f6290f14', tmp_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/3/.temp/', pipeline_node=node_info { type { name: "tfx.components.schema_gen.component.SchemaGen" base_type: PROCESS } id: "SchemaGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2024-04-30T10:27:00.897615" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.SchemaGen" } } } } inputs { inputs { key: "statistics" value { channels { producer_node_query { id: "StatisticsGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2024-04-30T10:27:00.897615" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } artifact_query { type { name: "ExampleStatistics" base_type: STATISTICS } } output_key: "statistics" } min_count: 1 } } } outputs { outputs { key: "schema" value { artifact_spec { type { name: "Schema" } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } parameters { key: "infer_feature_shape" value { field_value { int_value: 1 } } } } upstream_nodes: "StatisticsGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2024-04-30T10:27:00.897615', top_level_pipeline_run_id=None, frontend_url=None) INFO:absl:Processing schema from statistics for split train. INFO:absl:Processing schema from statistics for split eval. INFO:absl:Schema written to ./tfx_pipeline_output/my_pipeline/SchemaGen/schema/3/schema.pbtxt. INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 3 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Deleted stateful_working_dir ./tfx_pipeline_output/my_pipeline/SchemaGen/.system/stateful_working_dir/3645251e-2e5d-4be6-8c74-f4e0f6290f14 INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/3" , artifact_type: name: "Schema" )]}) for execution 3 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component SchemaGen is finished.
You should see "Component ExampleValidator is finished." if the pipeline ran successfully.
Examine output of the pipeline.
TFX pipeline produces two kinds of output, artifacts and a
metadata DB(MLMD) which contains
metadata of artifacts and pipeline executions. The location to the output is
defined in local_runner.py
. By default, artifacts are stored under
tfx_pipeline_output
directory and metadata is stored as an sqlite database
under tfx_metadata
directory.
You can use MLMD APIs to examine these outputs. First, we will define some utility functions to search output artifacts that were just produced.
import tensorflow as tf
import tfx
from ml_metadata import errors
from ml_metadata.proto import metadata_store_pb2
from tfx.types import artifact_utils
# TODO(b/171447278): Move these functions into TFX library.
def get_latest_executions(store, pipeline_name, component_id = None):
"""Fetch all pipeline runs."""
if component_id is None: # Find entire pipeline runs.
run_contexts = [
c for c in store.get_contexts_by_type('run')
if c.properties['pipeline_name'].string_value == pipeline_name
]
else: # Find specific component runs.
run_contexts = [
c for c in store.get_contexts_by_type('component_run')
if c.properties['pipeline_name'].string_value == pipeline_name and
c.properties['component_id'].string_value == component_id
]
if not run_contexts:
return []
# Pick the latest run context.
latest_context = max(run_contexts,
key=lambda c: c.last_update_time_since_epoch)
return store.get_executions_by_context(latest_context.id)
def get_latest_artifacts(store, pipeline_name, component_id = None):
"""Fetch all artifacts from latest pipeline execution."""
executions = get_latest_executions(store, pipeline_name, component_id)
# Fetch all artifacts produced from the given executions.
execution_ids = [e.id for e in executions]
events = store.get_events_by_execution_ids(execution_ids)
artifact_ids = [
event.artifact_id for event in events
if event.type == metadata_store_pb2.Event.OUTPUT
]
return store.get_artifacts_by_id(artifact_ids)
def find_latest_artifacts_by_type(store, artifacts, artifact_type):
"""Get the latest artifacts of a specified type."""
# Get type information from MLMD
try:
artifact_type = store.get_artifact_type(artifact_type)
except errors.NotFoundError:
return []
# Filter artifacts with type.
filtered_artifacts = [aritfact for aritfact in artifacts
if aritfact.type_id == artifact_type.id]
# Convert MLMD artifact data into TFX Artifact instances.
return [artifact_utils.deserialize_artifact(artifact_type, artifact)
for artifact in filtered_artifacts]
from tfx.orchestration.experimental.interactive import visualizations
def visualize_artifacts(artifacts):
"""Visualizes artifacts using standard visualization modules."""
for artifact in artifacts:
visualization = visualizations.get_registry().get_visualization(
artifact.type_name)
if visualization:
visualization.display(artifact)
from tfx.orchestration.experimental.interactive import standard_visualizations
standard_visualizations.register_standard_visualizations()
import pprint
from tfx.orchestration import metadata
from tfx.types import artifact_utils
from tfx.types import standard_artifacts
def preview_examples(artifacts):
"""Preview a few records from Examples artifacts."""
pp = pprint.PrettyPrinter()
for artifact in artifacts:
print("==== Examples artifact:{}({})".format(artifact.name, artifact.uri))
for split in artifact_utils.decode_split_names(artifact.split_names):
print("==== Reading from split:{}".format(split))
split_uri = artifact_utils.get_split_uri([artifact], split)
# Get the list of files in this directory (all compressed TFRecord files)
tfrecord_filenames = [os.path.join(split_uri, name)
for name in os.listdir(split_uri)]
# Create a `TFRecordDataset` to read these files
dataset = tf.data.TFRecordDataset(tfrecord_filenames,
compression_type="GZIP")
# Iterate over the first 2 records and decode them.
for tfrecord in dataset.take(2):
serialized_example = tfrecord.numpy()
example = tf.train.Example()
example.ParseFromString(serialized_example)
pp.pprint(example)
import local_runner
metadata_connection_config = metadata.sqlite_metadata_connection_config(
local_runner.METADATA_PATH)
Now we can read metadata of output artifacts from MLMD.
with metadata.Metadata(metadata_connection_config) as metadata_handler:
# Search all aritfacts from the previous pipeline run.
artifacts = get_latest_artifacts(metadata_handler.store, PIPELINE_NAME)
# Find artifacts of Examples type.
examples_artifacts = find_latest_artifacts_by_type(
metadata_handler.store, artifacts,
standard_artifacts.Examples.TYPE_NAME)
# Find artifacts generated from StatisticsGen.
stats_artifacts = find_latest_artifacts_by_type(
metadata_handler.store, artifacts,
standard_artifacts.ExampleStatistics.TYPE_NAME)
# Find artifacts generated from SchemaGen.
schema_artifacts = find_latest_artifacts_by_type(
metadata_handler.store, artifacts,
standard_artifacts.Schema.TYPE_NAME)
# Find artifacts generated from ExampleValidator.
anomalies_artifacts = find_latest_artifacts_by_type(
metadata_handler.store, artifacts,
standard_artifacts.ExampleAnomalies.TYPE_NAME)
Now we can examine outputs from each component.
Tensorflow Data Validation(TFDV)
is used in StatisticsGen
, SchemaGen
and ExampleValidator
, and TFDV can
be used to visualize outputs from these components.
In this tutorial, we will use visualzation helper methods in TFX which use TFDV internally to show the visualization. Please see TFX components tutorial to learn more about each component.
Examine output form ExampleGen
Let's examine output from ExampleGen. Take a look at the first two examples for each split:
preview_examples(examples_artifacts)
By default, TFX ExampleGen divides examples into two splits, train and eval, but you can adjust your split configuration.
Examine output from StatisticsGen
visualize_artifacts(stats_artifacts)
These statistics are supplied to SchemaGen to construct a schema of data automatically.
Examine output from SchemaGen
visualize_artifacts(schema_artifacts)
This schema is automatically inferred from the output of StatisticsGen. We will use this generated schema in this tutorial, but you also can modify and customize the schema.
Examine output from ExampleValidator
visualize_artifacts(anomalies_artifacts)
If any anomalies were found, you may review your data that all examples follow your assumptions. Outputs from other components like StatistcsGen might be useful. Found anomalies don't block the pipeline execution.
You can see the available features from the outputs of the SchemaGen
. If
your features can be used to construct ML model in Trainer
directly, you can
skip the next step and go to Step 4. Otherwise you can do some feature
engineering work in the next step. Transform
component is needed when
full-pass operations like calculating averages are required, especially when
you need to scale.
Step 3. (Optional) Feature engineering with Transform component.
In this step, you will define various feature engineering job which will be
used by Transform
component in the pipeline. See
Transform component guide
for more information.
This is only necessary if you training code requires additional feature(s) which is not available in the output of ExampleGen. Otherwise, feel free to fast forward to next step of using Trainer.
Define features of the model
models/features.py
contains constants to define features for the model
including feature names, size of vocabulariy and so on. By default penguin
template has two costants, FEATURE_KEYS
and LABEL_KEY
, because our penguin
model solves a classification problem using supervised learning and all
features are continuous numeric features. See
feature definitions from the chicago taxi example
for another example.
Implement preprocessing for training / serving in preprocessing_fn().
Actual feature engineering happens in preprocessing_fn()
function in
models/preprocessing.py
.
In preprocessing_fn
you can define a series of functions that manipulate the
input dict of tensors to produce the output dict of tensors. There are helper
functions like scale_to_0_1
and compute_and_apply_vocabulary
in the
TensorFlow Transform API or you can simply use regular TensorFlow functions.
By default penguin
template includes example usages of
tft.scale_to_z_score
function to normalize feature values.
See Tensflow Transform guide
for more information about authoring preprocessing_fn
.
Add Transform component to the pipeline.
If your preprocessing_fn is ready, add Transform
component to the pipeline.
- In
pipeline/pipeline.py
file, uncomment# components.append(transform)
to add the component to the pipeline.
You can update the pipeline and run again.
!tfx pipeline update --engine=local --pipeline_path=local_runner.py \
&& tfx run create --engine=local --pipeline_name={PIPELINE_NAME}
2024-04-30 10:27:09.798487: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered 2024-04-30 10:27:09.798530: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered 2024-04-30 10:27:09.799867: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered CLI Updating pipeline INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Excluding no splits because exclude_splits is not set. Pipeline "my_pipeline" updated successfully. 2024-04-30 10:27:17.813772: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered 2024-04-30 10:27:17.813818: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered 2024-04-30 10:27:17.815189: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered CLI Creating a run for pipeline: my_pipeline 2024-04-30 10:27:23.315274: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered 2024-04-30 10:27:23.315318: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered 2024-04-30 10:27:23.316621: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Using deployment config: executor_specs { key: "CsvExampleGen" value { beam_executable_spec { python_executor_spec { class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor" } } } } executor_specs { key: "SchemaGen" value { python_class_executable_spec { class_path: "tfx.components.schema_gen.executor.Executor" } } } executor_specs { key: "StatisticsGen" value { beam_executable_spec { python_executor_spec { class_path: "tfx.components.statistics_gen.executor.Executor" } } } } custom_driver_specs { key: "CsvExampleGen" value { python_class_executable_spec { class_path: "tfx.components.example_gen.driver.FileBasedDriver" } } } metadata_connection_config { database_connection_config { sqlite { filename_uri: "./tfx_metadata/my_pipeline/metadata.db" connection_mode: READWRITE_OPENCREATE } } } INFO:absl:Using connection config: sqlite { filename_uri: "./tfx_metadata/my_pipeline/metadata.db" connection_mode: READWRITE_OPENCREATE } INFO:absl:Component CsvExampleGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen" } id: "CsvExampleGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2024-04-30T10:27:28.834621" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } } outputs { outputs { key: "examples" value { artifact_spec { type { name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET } } } } } parameters { parameters { key: "input_base" value { field_value { string_value: "/home/kbuilder/imported/my_pipeline/data" } } } parameters { key: "input_config" value { field_value { string_value: "{\n \"splits\": [\n {\n \"name\": \"single_split\",\n \"pattern\": \"*\"\n }\n ]\n}" } } } parameters { key: "output_config" value { field_value { string_value: "{\n \"split_config\": {\n \"splits\": [\n {\n \"hash_buckets\": 2,\n \"name\": \"train\"\n },\n {\n \"hash_buckets\": 1,\n \"name\": \"eval\"\n }\n ]\n }\n}" } } } parameters { key: "output_data_format" value { field_value { int_value: 6 } } } parameters { key: "output_file_format" value { field_value { int_value: 5 } } } } downstream_nodes: "StatisticsGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized INFO:absl:[CsvExampleGen] Resolved inputs: ({},) INFO:absl:select span and version = (0, None) INFO:absl:latest span and version = (0, None) INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 4 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=4, input_dict={}, output_dict=defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/4" custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1714472788,sum_checksum:1714472788" } } custom_properties { key: "span" value { int_value: 0 } } , artifact_type: name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}), exec_properties={'output_config': '{\n "split_config": {\n "splits": [\n {\n "hash_buckets": 2,\n "name": "train"\n },\n {\n "hash_buckets": 1,\n "name": "eval"\n }\n ]\n }\n}', 'output_file_format': 5, 'input_base': '/home/kbuilder/imported/my_pipeline/data', 'output_data_format': 6, 'input_config': '{\n "splits": [\n {\n "name": "single_split",\n "pattern": "*"\n }\n ]\n}', 'span': 0, 'version': None, 'input_fingerprint': 'split:single_split,num_files:1,total_bytes:25648,xor_checksum:1714472788,sum_checksum:1714472788'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/4/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/stateful_working_dir/11e5a584-97d6-4d43-9894-a0bc6d79e3c9', tmp_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/4/.temp/', pipeline_node=node_info { type { name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen" } id: "CsvExampleGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2024-04-30T10:27:28.834621" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } } outputs { outputs { key: "examples" value { artifact_spec { type { name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET } } } } } parameters { parameters { key: "input_base" value { field_value { string_value: "/home/kbuilder/imported/my_pipeline/data" } } } parameters { key: "input_config" value { field_value { string_value: "{\n \"splits\": [\n {\n \"name\": \"single_split\",\n \"pattern\": \"*\"\n }\n ]\n}" } } } parameters { key: "output_config" value { field_value { string_value: "{\n \"split_config\": {\n \"splits\": [\n {\n \"hash_buckets\": 2,\n \"name\": \"train\"\n },\n {\n \"hash_buckets\": 1,\n \"name\": \"eval\"\n }\n ]\n }\n}" } } } parameters { key: "output_data_format" value { field_value { int_value: 6 } } } parameters { key: "output_file_format" value { field_value { int_value: 5 } } } } downstream_nodes: "StatisticsGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2024-04-30T10:27:28.834621', top_level_pipeline_run_id=None, frontend_url=None) INFO:absl:Generating examples. INFO:absl:Processing input csv data /home/kbuilder/imported/my_pipeline/data/* to TFExample. WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be. INFO:absl:Examples generated. INFO:absl:Value type <class 'NoneType'> of key version in exec_properties is not supported, going to drop it INFO:absl:Value type <class 'list'> of key _beam_pipeline_args in exec_properties is not supported, going to drop it INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 4 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Deleted stateful_working_dir ./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/stateful_working_dir/11e5a584-97d6-4d43-9894-a0bc6d79e3c9 INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/4" custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1714472788,sum_checksum:1714472788" } } custom_properties { key: "span" value { int_value: 0 } } , artifact_type: name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}) for execution 4 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component CsvExampleGen is finished. INFO:absl:Component StatisticsGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.statistics_gen.component.StatisticsGen" base_type: PROCESS } id: "StatisticsGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2024-04-30T10:27:28.834621" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } } inputs { inputs { key: "examples" value { channels { producer_node_query { id: "CsvExampleGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2024-04-30T10:27:28.834621" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } artifact_query { type { name: "Examples" base_type: DATASET } } output_key: "examples" } min_count: 1 } } } outputs { outputs { key: "statistics" value { artifact_spec { type { name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } } upstream_nodes: "CsvExampleGen" downstream_nodes: "SchemaGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized WARNING:absl:ArtifactQuery.property_predicate is not supported. INFO:absl:[StatisticsGen] Resolved inputs: ({'examples': [Artifact(artifact: id: 4 type_id: 15 uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/4" properties { key: "split_names" value { string_value: "[\"train\", \"eval\"]" } } custom_properties { key: "file_format" value { string_value: "tfrecords_gzip" } } custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1714472788,sum_checksum:1714472788" } } custom_properties { key: "is_external" value { int_value: 0 } } custom_properties { key: "payload_format" value { string_value: "FORMAT_TF_EXAMPLE" } } custom_properties { key: "span" value { int_value: 0 } } custom_properties { key: "tfx_version" value { string_value: "1.15.0" } } state: LIVE type: "Examples" create_time_since_epoch: 1714472849717 last_update_time_since_epoch: 1714472849717 , artifact_type: id: 15 name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]},) INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 5 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=5, input_dict={'examples': [Artifact(artifact: id: 4 type_id: 15 uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/4" properties { key: "split_names" value { string_value: "[\"train\", \"eval\"]" } } custom_properties { key: "file_format" value { string_value: "tfrecords_gzip" } } custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1714472788,sum_checksum:1714472788" } } custom_properties { key: "is_external" value { int_value: 0 } } custom_properties { key: "payload_format" value { string_value: "FORMAT_TF_EXAMPLE" } } custom_properties { key: "span" value { int_value: 0 } } custom_properties { key: "tfx_version" value { string_value: "1.15.0" } } state: LIVE type: "Examples" create_time_since_epoch: 1714472849717 last_update_time_since_epoch: 1714472849717 , artifact_type: id: 15 name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}, output_dict=defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/5" , artifact_type: name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}), exec_properties={'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/5/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/stateful_working_dir/eb8f1511-5826-4bc7-9ab8-418ed49632f8', tmp_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/5/.temp/', pipeline_node=node_info { type { name: "tfx.components.statistics_gen.component.StatisticsGen" base_type: PROCESS } id: "StatisticsGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2024-04-30T10:27:28.834621" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } } inputs { inputs { key: "examples" value { channels { producer_node_query { id: "CsvExampleGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2024-04-30T10:27:28.834621" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } artifact_query { type { name: "Examples" base_type: DATASET } } output_key: "examples" } min_count: 1 } } } outputs { outputs { key: "statistics" value { artifact_spec { type { name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } } upstream_nodes: "CsvExampleGen" downstream_nodes: "SchemaGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2024-04-30T10:27:28.834621', top_level_pipeline_run_id=None, frontend_url=None) INFO:absl:Generating statistics for split train. INFO:absl:Statistics for split train written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/5/Split-train. INFO:absl:Generating statistics for split eval. INFO:absl:Statistics for split eval written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/5/Split-eval. INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 5 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Deleted stateful_working_dir ./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/stateful_working_dir/eb8f1511-5826-4bc7-9ab8-418ed49632f8 INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/5" , artifact_type: name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}) for execution 5 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component StatisticsGen is finished. INFO:absl:Component SchemaGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.schema_gen.component.SchemaGen" base_type: PROCESS } id: "SchemaGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2024-04-30T10:27:28.834621" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.SchemaGen" } } } } inputs { inputs { key: "statistics" value { channels { producer_node_query { id: "StatisticsGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2024-04-30T10:27:28.834621" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } artifact_query { type { name: "ExampleStatistics" base_type: STATISTICS } } output_key: "statistics" } min_count: 1 } } } outputs { outputs { key: "schema" value { artifact_spec { type { name: "Schema" } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } parameters { key: "infer_feature_shape" value { field_value { int_value: 1 } } } } upstream_nodes: "StatisticsGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized WARNING:absl:ArtifactQuery.property_predicate is not supported. INFO:absl:[SchemaGen] Resolved inputs: ({'statistics': [Artifact(artifact: id: 5 type_id: 17 uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/5" properties { key: "span" value { int_value: 0 } } properties { key: "split_names" value { string_value: "[\"train\", \"eval\"]" } } custom_properties { key: "is_external" value { int_value: 0 } } custom_properties { key: "stats_dashboard_link" value { string_value: "" } } custom_properties { key: "tfx_version" value { string_value: "1.15.0" } } state: LIVE type: "ExampleStatistics" create_time_since_epoch: 1714472852419 last_update_time_since_epoch: 1714472852419 , artifact_type: id: 17 name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]},) INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 6 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=6, input_dict={'statistics': [Artifact(artifact: id: 5 type_id: 17 uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/5" properties { key: "span" value { int_value: 0 } } properties { key: "split_names" value { string_value: "[\"train\", \"eval\"]" } } custom_properties { key: "is_external" value { int_value: 0 } } custom_properties { key: "stats_dashboard_link" value { string_value: "" } } custom_properties { key: "tfx_version" value { string_value: "1.15.0" } } state: LIVE type: "ExampleStatistics" create_time_since_epoch: 1714472852419 last_update_time_since_epoch: 1714472852419 , artifact_type: id: 17 name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}, output_dict=defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/6" , artifact_type: name: "Schema" )]}), exec_properties={'infer_feature_shape': 1, 'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/6/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/stateful_working_dir/8954b552-410f-4bd1-b821-e111e72fdbc1', tmp_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/6/.temp/', pipeline_node=node_info { type { name: "tfx.components.schema_gen.component.SchemaGen" base_type: PROCESS } id: "SchemaGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2024-04-30T10:27:28.834621" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.SchemaGen" } } } } inputs { inputs { key: "statistics" value { channels { producer_node_query { id: "StatisticsGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2024-04-30T10:27:28.834621" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } artifact_query { type { name: "ExampleStatistics" base_type: STATISTICS } } output_key: "statistics" } min_count: 1 } } } outputs { outputs { key: "schema" value { artifact_spec { type { name: "Schema" } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } parameters { key: "infer_feature_shape" value { field_value { int_value: 1 } } } } upstream_nodes: "StatisticsGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2024-04-30T10:27:28.834621', top_level_pipeline_run_id=None, frontend_url=None) INFO:absl:Processing schema from statistics for split train. INFO:absl:Processing schema from statistics for split eval. INFO:absl:Schema written to ./tfx_pipeline_output/my_pipeline/SchemaGen/schema/6/schema.pbtxt. INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 6 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Deleted stateful_working_dir ./tfx_pipeline_output/my_pipeline/SchemaGen/.system/stateful_working_dir/8954b552-410f-4bd1-b821-e111e72fdbc1 INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/6" , artifact_type: name: "Schema" )]}) for execution 6 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component SchemaGen is finished.
If the pipeline ran successfully, you should see "Component Transform is
finished." somewhere in the log. Because Transform
component and
ExampleValidator
component are not dependent to each other, the order of
executions is not fixed. That said, either of Transform
and
ExampleValidator
can be the last component in the pipeline execution.
Examine output from Transform
Transform component creates two kinds of outputs, a Tensorflow graph and transformed examples. The transformed examples are Examples artifact type which is also produced by ExampleGen, but this one contains transformed feature values instead.
You can examine them as we did in the previous step.
with metadata.Metadata(metadata_connection_config) as metadata_handler:
# Search all aritfacts from the previous run of Transform component.
artifacts = get_latest_artifacts(metadata_handler.store,
PIPELINE_NAME, "Transform")
# Find artifacts of Examples type.
transformed_examples_artifacts = find_latest_artifacts_by_type(
metadata_handler.store, artifacts,
standard_artifacts.Examples.TYPE_NAME)
preview_examples(transformed_examples_artifacts)
Step 4. Train your model with Trainer component.
We will build a ML model using Trainer
component. See
Trainer component guide
for more information. You need to provide your model code to the Trainer
component.
Define your model.
In penguin template, models.model.run_fn
is used as run_fn
argument for
Trainer
component. It means that run_fn()
function in models/model.py
will be called when Trainer
component runs. You can see the code to construct
a simple DNN model using keras
API in given code. See
TensorFlow 2.x in TFX
guide for more information about using keras API in TFX.
In this run_fn
, you should build a model and save it to a directory pointed
by fn_args.serving_model_dir
which is specified by the component. You can use
other arguments in fn_args
which is passed into the run_fn
. See
related codes
for the full list of arguments in fn_args
.
Define your features in models/features.py
and use them as needed. If you
have transformed your features in Step 3, you should use transformed features
as inputs to your model.
Add Trainer component to the pipeline.
If your run_fn is ready, add Trainer
component to the pipeline.
- In
pipeline/pipeline.py
file, uncomment# components.append(trainer)
to add the component to the pipeline.
Arguments for the trainer component might depends on whether you use Transform component or not.
- If you do NOT use
Transform
component, you don't need to change the arguments. If you use
Transform
component, you need to change arguments when creating aTrainer
component instance.- Change
examples
argument toexamples=transform.outputs['transformed_examples'],
. We need to use transformed examples for training. - Add
transform_graph
argument liketransform_graph=transform.outputs['transform_graph'],
. This graph contains TensorFlow graph for the transform operations. - After above changes, the code for Trainer component creation will look like following.
# If you use a Transform component. trainer = Trainer( run_fn=run_fn, examples=transform.outputs['transformed_examples'], transform_graph=transform.outputs['transform_graph'], schema=schema_gen.outputs['schema'], ...
- Change
You can update the pipeline and run again.
!tfx pipeline update --engine=local --pipeline_path=local_runner.py \
&& tfx run create --engine=local --pipeline_name={PIPELINE_NAME}
2024-04-30 10:27:35.563863: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered 2024-04-30 10:27:35.563910: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered 2024-04-30 10:27:35.565258: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered CLI Updating pipeline INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Excluding no splits because exclude_splits is not set. Pipeline "my_pipeline" updated successfully. 2024-04-30 10:27:43.576836: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered 2024-04-30 10:27:43.576883: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered 2024-04-30 10:27:43.578256: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered CLI Creating a run for pipeline: my_pipeline 2024-04-30 10:27:49.088885: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered 2024-04-30 10:27:49.088931: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered 2024-04-30 10:27:49.090272: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Using deployment config: executor_specs { key: "CsvExampleGen" value { beam_executable_spec { python_executor_spec { class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor" } } } } executor_specs { key: "SchemaGen" value { python_class_executable_spec { class_path: "tfx.components.schema_gen.executor.Executor" } } } executor_specs { key: "StatisticsGen" value { beam_executable_spec { python_executor_spec { class_path: "tfx.components.statistics_gen.executor.Executor" } } } } custom_driver_specs { key: "CsvExampleGen" value { python_class_executable_spec { class_path: "tfx.components.example_gen.driver.FileBasedDriver" } } } metadata_connection_config { database_connection_config { sqlite { filename_uri: "./tfx_metadata/my_pipeline/metadata.db" connection_mode: READWRITE_OPENCREATE } } } INFO:absl:Using connection config: sqlite { filename_uri: "./tfx_metadata/my_pipeline/metadata.db" connection_mode: READWRITE_OPENCREATE } INFO:absl:Component CsvExampleGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen" } id: "CsvExampleGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2024-04-30T10:27:54.516464" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } } outputs { outputs { key: "examples" value { artifact_spec { type { name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET } } } } } parameters { parameters { key: "input_base" value { field_value { string_value: "/home/kbuilder/imported/my_pipeline/data" } } } parameters { key: "input_config" value { field_value { string_value: "{\n \"splits\": [\n {\n \"name\": \"single_split\",\n \"pattern\": \"*\"\n }\n ]\n}" } } } parameters { key: "output_config" value { field_value { string_value: "{\n \"split_config\": {\n \"splits\": [\n {\n \"hash_buckets\": 2,\n \"name\": \"train\"\n },\n {\n \"hash_buckets\": 1,\n \"name\": \"eval\"\n }\n ]\n }\n}" } } } parameters { key: "output_data_format" value { field_value { int_value: 6 } } } parameters { key: "output_file_format" value { field_value { int_value: 5 } } } } downstream_nodes: "StatisticsGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized INFO:absl:[CsvExampleGen] Resolved inputs: ({},) INFO:absl:select span and version = (0, None) INFO:absl:latest span and version = (0, None) INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 7 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=7, input_dict={}, output_dict=defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/7" custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1714472788,sum_checksum:1714472788" } } custom_properties { key: "span" value { int_value: 0 } } , artifact_type: name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}), exec_properties={'output_file_format': 5, 'output_config': '{\n "split_config": {\n "splits": [\n {\n "hash_buckets": 2,\n "name": "train"\n },\n {\n "hash_buckets": 1,\n "name": "eval"\n }\n ]\n }\n}', 'input_config': '{\n "splits": [\n {\n "name": "single_split",\n "pattern": "*"\n }\n ]\n}', 'output_data_format': 6, 'input_base': '/home/kbuilder/imported/my_pipeline/data', 'span': 0, 'version': None, 'input_fingerprint': 'split:single_split,num_files:1,total_bytes:25648,xor_checksum:1714472788,sum_checksum:1714472788'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/7/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/stateful_working_dir/6a5361c2-11e1-4af8-a975-a6c2777c2db7', tmp_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/7/.temp/', pipeline_node=node_info { type { name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen" } id: "CsvExampleGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2024-04-30T10:27:54.516464" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } } outputs { outputs { key: "examples" value { artifact_spec { type { name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET } } } } } parameters { parameters { key: "input_base" value { field_value { string_value: "/home/kbuilder/imported/my_pipeline/data" } } } parameters { key: "input_config" value { field_value { string_value: "{\n \"splits\": [\n {\n \"name\": \"single_split\",\n \"pattern\": \"*\"\n }\n ]\n}" } } } parameters { key: "output_config" value { field_value { string_value: "{\n \"split_config\": {\n \"splits\": [\n {\n \"hash_buckets\": 2,\n \"name\": \"train\"\n },\n {\n \"hash_buckets\": 1,\n \"name\": \"eval\"\n }\n ]\n }\n}" } } } parameters { key: "output_data_format" value { field_value { int_value: 6 } } } parameters { key: "output_file_format" value { field_value { int_value: 5 } } } } downstream_nodes: "StatisticsGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2024-04-30T10:27:54.516464', top_level_pipeline_run_id=None, frontend_url=None) INFO:absl:Generating examples. INFO:absl:Processing input csv data /home/kbuilder/imported/my_pipeline/data/* to TFExample. WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be. INFO:absl:Examples generated. INFO:absl:Value type <class 'NoneType'> of key version in exec_properties is not supported, going to drop it INFO:absl:Value type <class 'list'> of key _beam_pipeline_args in exec_properties is not supported, going to drop it INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 7 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Deleted stateful_working_dir ./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/stateful_working_dir/6a5361c2-11e1-4af8-a975-a6c2777c2db7 INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/7" custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1714472788,sum_checksum:1714472788" } } custom_properties { key: "span" value { int_value: 0 } } , artifact_type: name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}) for execution 7 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component CsvExampleGen is finished. INFO:absl:Component StatisticsGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.statistics_gen.component.StatisticsGen" base_type: PROCESS } id: "StatisticsGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2024-04-30T10:27:54.516464" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } } inputs { inputs { key: "examples" value { channels { producer_node_query { id: "CsvExampleGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2024-04-30T10:27:54.516464" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } artifact_query { type { name: "Examples" base_type: DATASET } } output_key: "examples" } min_count: 1 } } } outputs { outputs { key: "statistics" value { artifact_spec { type { name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } } upstream_nodes: "CsvExampleGen" downstream_nodes: "SchemaGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized WARNING:absl:ArtifactQuery.property_predicate is not supported. INFO:absl:[StatisticsGen] Resolved inputs: ({'examples': [Artifact(artifact: id: 7 type_id: 15 uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/7" properties { key: "split_names" value { string_value: "[\"train\", \"eval\"]" } } custom_properties { key: "file_format" value { string_value: "tfrecords_gzip" } } custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1714472788,sum_checksum:1714472788" } } custom_properties { key: "is_external" value { int_value: 0 } } custom_properties { key: "payload_format" value { string_value: "FORMAT_TF_EXAMPLE" } } custom_properties { key: "span" value { int_value: 0 } } custom_properties { key: "tfx_version" value { string_value: "1.15.0" } } state: LIVE type: "Examples" create_time_since_epoch: 1714472875392 last_update_time_since_epoch: 1714472875392 , artifact_type: id: 15 name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]},) INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 8 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=8, input_dict={'examples': [Artifact(artifact: id: 7 type_id: 15 uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/7" properties { key: "split_names" value { string_value: "[\"train\", \"eval\"]" } } custom_properties { key: "file_format" value { string_value: "tfrecords_gzip" } } custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1714472788,sum_checksum:1714472788" } } custom_properties { key: "is_external" value { int_value: 0 } } custom_properties { key: "payload_format" value { string_value: "FORMAT_TF_EXAMPLE" } } custom_properties { key: "span" value { int_value: 0 } } custom_properties { key: "tfx_version" value { string_value: "1.15.0" } } state: LIVE type: "Examples" create_time_since_epoch: 1714472875392 last_update_time_since_epoch: 1714472875392 , artifact_type: id: 15 name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}, output_dict=defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/8" , artifact_type: name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}), exec_properties={'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/8/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/stateful_working_dir/b5e8541e-1a39-41c1-94ad-d31d7ac4feab', tmp_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/8/.temp/', pipeline_node=node_info { type { name: "tfx.components.statistics_gen.component.StatisticsGen" base_type: PROCESS } id: "StatisticsGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2024-04-30T10:27:54.516464" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } } inputs { inputs { key: "examples" value { channels { producer_node_query { id: "CsvExampleGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2024-04-30T10:27:54.516464" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } artifact_query { type { name: "Examples" base_type: DATASET } } output_key: "examples" } min_count: 1 } } } outputs { outputs { key: "statistics" value { artifact_spec { type { name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } } upstream_nodes: "CsvExampleGen" downstream_nodes: "SchemaGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2024-04-30T10:27:54.516464', top_level_pipeline_run_id=None, frontend_url=None) INFO:absl:Generating statistics for split train. INFO:absl:Statistics for split train written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/8/Split-train. INFO:absl:Generating statistics for split eval. INFO:absl:Statistics for split eval written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/8/Split-eval. INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 8 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Deleted stateful_working_dir ./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/stateful_working_dir/b5e8541e-1a39-41c1-94ad-d31d7ac4feab INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/8" , artifact_type: name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}) for execution 8 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component StatisticsGen is finished. INFO:absl:Component SchemaGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.schema_gen.component.SchemaGen" base_type: PROCESS } id: "SchemaGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2024-04-30T10:27:54.516464" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.SchemaGen" } } } } inputs { inputs { key: "statistics" value { channels { producer_node_query { id: "StatisticsGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2024-04-30T10:27:54.516464" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } artifact_query { type { name: "ExampleStatistics" base_type: STATISTICS } } output_key: "statistics" } min_count: 1 } } } outputs { outputs { key: "schema" value { artifact_spec { type { name: "Schema" } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } parameters { key: "infer_feature_shape" value { field_value { int_value: 1 } } } } upstream_nodes: "StatisticsGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized WARNING:absl:ArtifactQuery.property_predicate is not supported. INFO:absl:[SchemaGen] Resolved inputs: ({'statistics': [Artifact(artifact: id: 8 type_id: 17 uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/8" properties { key: "span" value { int_value: 0 } } properties { key: "split_names" value { string_value: "[\"train\", \"eval\"]" } } custom_properties { key: "is_external" value { int_value: 0 } } custom_properties { key: "stats_dashboard_link" value { string_value: "" } } custom_properties { key: "tfx_version" value { string_value: "1.15.0" } } state: LIVE type: "ExampleStatistics" create_time_since_epoch: 1714472878106 last_update_time_since_epoch: 1714472878106 , artifact_type: id: 17 name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]},) INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 9 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=9, input_dict={'statistics': [Artifact(artifact: id: 8 type_id: 17 uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/8" properties { key: "span" value { int_value: 0 } } properties { key: "split_names" value { string_value: "[\"train\", \"eval\"]" } } custom_properties { key: "is_external" value { int_value: 0 } } custom_properties { key: "stats_dashboard_link" value { string_value: "" } } custom_properties { key: "tfx_version" value { string_value: "1.15.0" } } state: LIVE type: "ExampleStatistics" create_time_since_epoch: 1714472878106 last_update_time_since_epoch: 1714472878106 , artifact_type: id: 17 name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}, output_dict=defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/9" , artifact_type: name: "Schema" )]}), exec_properties={'infer_feature_shape': 1, 'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/9/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/stateful_working_dir/c792daff-d075-40fc-9fc2-f39420f7e410', tmp_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/9/.temp/', pipeline_node=node_info { type { name: "tfx.components.schema_gen.component.SchemaGen" base_type: PROCESS } id: "SchemaGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2024-04-30T10:27:54.516464" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.SchemaGen" } } } } inputs { inputs { key: "statistics" value { channels { producer_node_query { id: "StatisticsGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2024-04-30T10:27:54.516464" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } artifact_query { type { name: "ExampleStatistics" base_type: STATISTICS } } output_key: "statistics" } min_count: 1 } } } outputs { outputs { key: "schema" value { artifact_spec { type { name: "Schema" } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } parameters { key: "infer_feature_shape" value { field_value { int_value: 1 } } } } upstream_nodes: "StatisticsGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2024-04-30T10:27:54.516464', top_level_pipeline_run_id=None, frontend_url=None) INFO:absl:Processing schema from statistics for split train. INFO:absl:Processing schema from statistics for split eval. INFO:absl:Schema written to ./tfx_pipeline_output/my_pipeline/SchemaGen/schema/9/schema.pbtxt. INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 9 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Deleted stateful_working_dir ./tfx_pipeline_output/my_pipeline/SchemaGen/.system/stateful_working_dir/c792daff-d075-40fc-9fc2-f39420f7e410 INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/9" , artifact_type: name: "Schema" )]}) for execution 9 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component SchemaGen is finished.
When this execution runs successfully, you have now created and run your first TFX pipeline for your model. Congratulations!
Your new model will be located in some place under the output directory, but it would be better to have a model in fixed location or service outside of the TFX pipeline which holds many interim results. Even better with continuous evaluation of the built model which is critical in ML production systems. We will see how continuous evaluation and deployments work in TFX in the next step.
Step 5. (Optional) Evaluate the model with Evaluator and publish with pusher.
Evaluator
component
continuously evaluate every built model from Trainer
, and
Pusher
copies the model to
a predefined location in the file system or even to
Google Cloud AI Platform Models.
Adds Evaluator component to the pipeline.
In pipeline/pipeline.py
file:
- Uncomment
# components.append(model_resolver)
to add latest model resolver to the pipeline. Evaluator can be used to compare a model with old baseline model which passed Evaluator in last pipeline run.LatestBlessedModelResolver
finds the latest model which passed Evaluator. - Set proper
tfma.MetricsSpec
for your model. Evaluation might be different for every ML model. In the penguin template,SparseCategoricalAccuracy
was used because we are solving a multi category classification problem. You also need to specifytfma.SliceSpec
to analyze your model for specific slices. For more detail, see Evaluator component guide. - Uncomment
# components.append(evaluator)
to add the component to the pipeline.
You can update the pipeline and run again.
# Update and run the pipeline.
!tfx pipeline update --engine=local --pipeline_path=local_runner.py \
&& tfx run create --engine=local --pipeline_name={PIPELINE_NAME}
2024-04-30 10:28:01.204370: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered 2024-04-30 10:28:01.204419: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered 2024-04-30 10:28:01.205769: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered CLI Updating pipeline INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Excluding no splits because exclude_splits is not set. Pipeline "my_pipeline" updated successfully. 2024-04-30 10:28:09.214390: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered 2024-04-30 10:28:09.214435: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered 2024-04-30 10:28:09.215797: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered CLI Creating a run for pipeline: my_pipeline 2024-04-30 10:28:14.722417: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered 2024-04-30 10:28:14.722466: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered 2024-04-30 10:28:14.723940: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Using deployment config: executor_specs { key: "CsvExampleGen" value { beam_executable_spec { python_executor_spec { class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor" } } } } executor_specs { key: "SchemaGen" value { python_class_executable_spec { class_path: "tfx.components.schema_gen.executor.Executor" } } } executor_specs { key: "StatisticsGen" value { beam_executable_spec { python_executor_spec { class_path: "tfx.components.statistics_gen.executor.Executor" } } } } custom_driver_specs { key: "CsvExampleGen" value { python_class_executable_spec { class_path: "tfx.components.example_gen.driver.FileBasedDriver" } } } metadata_connection_config { database_connection_config { sqlite { filename_uri: "./tfx_metadata/my_pipeline/metadata.db" connection_mode: READWRITE_OPENCREATE } } } INFO:absl:Using connection config: sqlite { filename_uri: "./tfx_metadata/my_pipeline/metadata.db" connection_mode: READWRITE_OPENCREATE } INFO:absl:Component CsvExampleGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen" } id: "CsvExampleGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2024-04-30T10:28:20.188504" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } } outputs { outputs { key: "examples" value { artifact_spec { type { name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET } } } } } parameters { parameters { key: "input_base" value { field_value { string_value: "/home/kbuilder/imported/my_pipeline/data" } } } parameters { key: "input_config" value { field_value { string_value: "{\n \"splits\": [\n {\n \"name\": \"single_split\",\n \"pattern\": \"*\"\n }\n ]\n}" } } } parameters { key: "output_config" value { field_value { string_value: "{\n \"split_config\": {\n \"splits\": [\n {\n \"hash_buckets\": 2,\n \"name\": \"train\"\n },\n {\n \"hash_buckets\": 1,\n \"name\": \"eval\"\n }\n ]\n }\n}" } } } parameters { key: "output_data_format" value { field_value { int_value: 6 } } } parameters { key: "output_file_format" value { field_value { int_value: 5 } } } } downstream_nodes: "StatisticsGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized INFO:absl:[CsvExampleGen] Resolved inputs: ({},) INFO:absl:select span and version = (0, None) INFO:absl:latest span and version = (0, None) INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 10 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=10, input_dict={}, output_dict=defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/10" custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1714472788,sum_checksum:1714472788" } } custom_properties { key: "span" value { int_value: 0 } } , artifact_type: name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}), exec_properties={'output_data_format': 6, 'output_config': '{\n "split_config": {\n "splits": [\n {\n "hash_buckets": 2,\n "name": "train"\n },\n {\n "hash_buckets": 1,\n "name": "eval"\n }\n ]\n }\n}', 'input_config': '{\n "splits": [\n {\n "name": "single_split",\n "pattern": "*"\n }\n ]\n}', 'output_file_format': 5, 'input_base': '/home/kbuilder/imported/my_pipeline/data', 'span': 0, 'version': None, 'input_fingerprint': 'split:single_split,num_files:1,total_bytes:25648,xor_checksum:1714472788,sum_checksum:1714472788'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/10/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/stateful_working_dir/68c4ff50-2af3-41de-8f55-e3b75d43fc35', tmp_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/10/.temp/', pipeline_node=node_info { type { name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen" } id: "CsvExampleGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2024-04-30T10:28:20.188504" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } } outputs { outputs { key: "examples" value { artifact_spec { type { name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET } } } } } parameters { parameters { key: "input_base" value { field_value { string_value: "/home/kbuilder/imported/my_pipeline/data" } } } parameters { key: "input_config" value { field_value { string_value: "{\n \"splits\": [\n {\n \"name\": \"single_split\",\n \"pattern\": \"*\"\n }\n ]\n}" } } } parameters { key: "output_config" value { field_value { string_value: "{\n \"split_config\": {\n \"splits\": [\n {\n \"hash_buckets\": 2,\n \"name\": \"train\"\n },\n {\n \"hash_buckets\": 1,\n \"name\": \"eval\"\n }\n ]\n }\n}" } } } parameters { key: "output_data_format" value { field_value { int_value: 6 } } } parameters { key: "output_file_format" value { field_value { int_value: 5 } } } } downstream_nodes: "StatisticsGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2024-04-30T10:28:20.188504', top_level_pipeline_run_id=None, frontend_url=None) INFO:absl:Generating examples. INFO:absl:Processing input csv data /home/kbuilder/imported/my_pipeline/data/* to TFExample. WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be. INFO:absl:Examples generated. INFO:absl:Value type <class 'NoneType'> of key version in exec_properties is not supported, going to drop it INFO:absl:Value type <class 'list'> of key _beam_pipeline_args in exec_properties is not supported, going to drop it INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 10 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Deleted stateful_working_dir ./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/stateful_working_dir/68c4ff50-2af3-41de-8f55-e3b75d43fc35 INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/10" custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1714472788,sum_checksum:1714472788" } } custom_properties { key: "span" value { int_value: 0 } } , artifact_type: name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}) for execution 10 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component CsvExampleGen is finished. INFO:absl:Component StatisticsGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.statistics_gen.component.StatisticsGen" base_type: PROCESS } id: "StatisticsGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2024-04-30T10:28:20.188504" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } } inputs { inputs { key: "examples" value { channels { producer_node_query { id: "CsvExampleGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2024-04-30T10:28:20.188504" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } artifact_query { type { name: "Examples" base_type: DATASET } } output_key: "examples" } min_count: 1 } } } outputs { outputs { key: "statistics" value { artifact_spec { type { name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } } upstream_nodes: "CsvExampleGen" downstream_nodes: "SchemaGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized WARNING:absl:ArtifactQuery.property_predicate is not supported. INFO:absl:[StatisticsGen] Resolved inputs: ({'examples': [Artifact(artifact: id: 10 type_id: 15 uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/10" properties { key: "split_names" value { string_value: "[\"train\", \"eval\"]" } } custom_properties { key: "file_format" value { string_value: "tfrecords_gzip" } } custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1714472788,sum_checksum:1714472788" } } custom_properties { key: "is_external" value { int_value: 0 } } custom_properties { key: "payload_format" value { string_value: "FORMAT_TF_EXAMPLE" } } custom_properties { key: "span" value { int_value: 0 } } custom_properties { key: "tfx_version" value { string_value: "1.15.0" } } state: LIVE type: "Examples" create_time_since_epoch: 1714472901061 last_update_time_since_epoch: 1714472901061 , artifact_type: id: 15 name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]},) INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 11 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=11, input_dict={'examples': [Artifact(artifact: id: 10 type_id: 15 uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/10" properties { key: "split_names" value { string_value: "[\"train\", \"eval\"]" } } custom_properties { key: "file_format" value { string_value: "tfrecords_gzip" } } custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1714472788,sum_checksum:1714472788" } } custom_properties { key: "is_external" value { int_value: 0 } } custom_properties { key: "payload_format" value { string_value: "FORMAT_TF_EXAMPLE" } } custom_properties { key: "span" value { int_value: 0 } } custom_properties { key: "tfx_version" value { string_value: "1.15.0" } } state: LIVE type: "Examples" create_time_since_epoch: 1714472901061 last_update_time_since_epoch: 1714472901061 , artifact_type: id: 15 name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}, output_dict=defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/11" , artifact_type: name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}), exec_properties={'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/11/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/stateful_working_dir/8390ead9-d836-4a9f-8ce2-a0203eb927c2', tmp_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/11/.temp/', pipeline_node=node_info { type { name: "tfx.components.statistics_gen.component.StatisticsGen" base_type: PROCESS } id: "StatisticsGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2024-04-30T10:28:20.188504" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } } inputs { inputs { key: "examples" value { channels { producer_node_query { id: "CsvExampleGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2024-04-30T10:28:20.188504" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } artifact_query { type { name: "Examples" base_type: DATASET } } output_key: "examples" } min_count: 1 } } } outputs { outputs { key: "statistics" value { artifact_spec { type { name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } } upstream_nodes: "CsvExampleGen" downstream_nodes: "SchemaGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2024-04-30T10:28:20.188504', top_level_pipeline_run_id=None, frontend_url=None) INFO:absl:Generating statistics for split train. INFO:absl:Statistics for split train written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/11/Split-train. INFO:absl:Generating statistics for split eval. INFO:absl:Statistics for split eval written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/11/Split-eval. INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 11 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Deleted stateful_working_dir ./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/stateful_working_dir/8390ead9-d836-4a9f-8ce2-a0203eb927c2 INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/11" , artifact_type: name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}) for execution 11 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component StatisticsGen is finished. INFO:absl:Component SchemaGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.schema_gen.component.SchemaGen" base_type: PROCESS } id: "SchemaGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2024-04-30T10:28:20.188504" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.SchemaGen" } } } } inputs { inputs { key: "statistics" value { channels { producer_node_query { id: "StatisticsGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2024-04-30T10:28:20.188504" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } artifact_query { type { name: "ExampleStatistics" base_type: STATISTICS } } output_key: "statistics" } min_count: 1 } } } outputs { outputs { key: "schema" value { artifact_spec { type { name: "Schema" } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } parameters { key: "infer_feature_shape" value { field_value { int_value: 1 } } } } upstream_nodes: "StatisticsGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized WARNING:absl:ArtifactQuery.property_predicate is not supported. INFO:absl:[SchemaGen] Resolved inputs: ({'statistics': [Artifact(artifact: id: 11 type_id: 17 uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/11" properties { key: "span" value { int_value: 0 } } properties { key: "split_names" value { string_value: "[\"train\", \"eval\"]" } } custom_properties { key: "is_external" value { int_value: 0 } } custom_properties { key: "stats_dashboard_link" value { string_value: "" } } custom_properties { key: "tfx_version" value { string_value: "1.15.0" } } state: LIVE type: "ExampleStatistics" create_time_since_epoch: 1714472903770 last_update_time_since_epoch: 1714472903770 , artifact_type: id: 17 name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]},) INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 12 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=12, input_dict={'statistics': [Artifact(artifact: id: 11 type_id: 17 uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/11" properties { key: "span" value { int_value: 0 } } properties { key: "split_names" value { string_value: "[\"train\", \"eval\"]" } } custom_properties { key: "is_external" value { int_value: 0 } } custom_properties { key: "stats_dashboard_link" value { string_value: "" } } custom_properties { key: "tfx_version" value { string_value: "1.15.0" } } state: LIVE type: "ExampleStatistics" create_time_since_epoch: 1714472903770 last_update_time_since_epoch: 1714472903770 , artifact_type: id: 17 name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}, output_dict=defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/12" , artifact_type: name: "Schema" )]}), exec_properties={'infer_feature_shape': 1, 'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/12/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/stateful_working_dir/6f7136f8-bba2-4e06-aabd-76e101da6785', tmp_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/12/.temp/', pipeline_node=node_info { type { name: "tfx.components.schema_gen.component.SchemaGen" base_type: PROCESS } id: "SchemaGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2024-04-30T10:28:20.188504" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.SchemaGen" } } } } inputs { inputs { key: "statistics" value { channels { producer_node_query { id: "StatisticsGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2024-04-30T10:28:20.188504" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } artifact_query { type { name: "ExampleStatistics" base_type: STATISTICS } } output_key: "statistics" } min_count: 1 } } } outputs { outputs { key: "schema" value { artifact_spec { type { name: "Schema" } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } parameters { key: "infer_feature_shape" value { field_value { int_value: 1 } } } } upstream_nodes: "StatisticsGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2024-04-30T10:28:20.188504', top_level_pipeline_run_id=None, frontend_url=None) INFO:absl:Processing schema from statistics for split train. INFO:absl:Processing schema from statistics for split eval. INFO:absl:Schema written to ./tfx_pipeline_output/my_pipeline/SchemaGen/schema/12/schema.pbtxt. INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 12 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Deleted stateful_working_dir ./tfx_pipeline_output/my_pipeline/SchemaGen/.system/stateful_working_dir/6f7136f8-bba2-4e06-aabd-76e101da6785 INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/12" , artifact_type: name: "Schema" )]}) for execution 12 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component SchemaGen is finished.
Examine output of Evaluator
This step requires TensorFlow Model Analysis(TFMA) Jupyter notebook extension. Note that the version of the TFMA notebook extension should be identical to the version of TFMA python package.
Following command will install TFMA notebook extension from NPM registry. It might take several minutes to complete.
# Install TFMA notebook extension.
jupyter labextension install tensorflow_model_analysis@{tfma.__version__}
(Deprecated) Installing extensions with the jupyter labextension install command is now deprecated and will be removed in a future major version of JupyterLab. Users should manage prebuilt extensions with package managers like pip and conda, and extension authors are encouraged to distribute their extensions as prebuilt packages /tmpfs/src/tf_docs_env/lib/python3.9/site-packages/jupyterlab/debuglog.py:56: UserWarning: An error occurred. warnings.warn("An error occurred.") /tmpfs/src/tf_docs_env/lib/python3.9/site-packages/jupyterlab/debuglog.py:57: UserWarning: ValueError: Please install Node.js and npm before continuing installation. You may be able to install Node.js from your package manager, from conda, or directly from the Node.js website (https://nodejs.org). warnings.warn(msg[-1].strip()) /tmpfs/src/tf_docs_env/lib/python3.9/site-packages/jupyterlab/debuglog.py:58: UserWarning: See the log file for details: /tmpfs/tmp/jupyterlab-debug-c9myd59v.log warnings.warn(f"See the log file for details: {log_path!s}")
If installation is completed, please reload your browser to make the extension take effect.
with metadata.Metadata(metadata_connection_config) as metadata_handler:
# Search all aritfacts from the previous pipeline run.
artifacts = get_latest_artifacts(metadata_handler.store, PIPELINE_NAME)
model_evaluation_artifacts = find_latest_artifacts_by_type(
metadata_handler.store, artifacts,
standard_artifacts.ModelEvaluation.TYPE_NAME)
if model_evaluation_artifacts:
tfma_result = tfma.load_eval_result(model_evaluation_artifacts[0].uri)
tfma.view.render_slicing_metrics(tfma_result)
Adds Pusher component to the pipeline.
If the model looks promising, we need to publish the model. Pusher component can publish the model to a location in the filesystem or to GCP AI Platform Models using a custom executor.
Evaluator
component continuously evaluate every built model from Trainer
,
and Pusher
copies the model to
a predefined location in the file system or even to
Google Cloud AI Platform Models.
- In
local_runner.py
, setSERVING_MODEL_DIR
to a directory to publish. - In
pipeline/pipeline.py
file, uncomment# components.append(pusher)
to add Pusher to the pipeline.
You can update the pipeline and run again.
# Update and run the pipeline.
!tfx pipeline update --engine=local --pipeline_path=local_runner.py \
&& tfx run create --engine=local --pipeline_name={PIPELINE_NAME}
2024-04-30 10:28:27.791186: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered 2024-04-30 10:28:27.791234: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered 2024-04-30 10:28:27.792620: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered CLI Updating pipeline INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Excluding no splits because exclude_splits is not set. Pipeline "my_pipeline" updated successfully. 2024-04-30 10:28:35.799472: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered 2024-04-30 10:28:35.799516: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered 2024-04-30 10:28:35.800903: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered CLI Creating a run for pipeline: my_pipeline 2024-04-30 10:28:41.284149: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered 2024-04-30 10:28:41.284198: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered 2024-04-30 10:28:41.285708: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Using deployment config: executor_specs { key: "CsvExampleGen" value { beam_executable_spec { python_executor_spec { class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor" } } } } executor_specs { key: "SchemaGen" value { python_class_executable_spec { class_path: "tfx.components.schema_gen.executor.Executor" } } } executor_specs { key: "StatisticsGen" value { beam_executable_spec { python_executor_spec { class_path: "tfx.components.statistics_gen.executor.Executor" } } } } custom_driver_specs { key: "CsvExampleGen" value { python_class_executable_spec { class_path: "tfx.components.example_gen.driver.FileBasedDriver" } } } metadata_connection_config { database_connection_config { sqlite { filename_uri: "./tfx_metadata/my_pipeline/metadata.db" connection_mode: READWRITE_OPENCREATE } } } INFO:absl:Using connection config: sqlite { filename_uri: "./tfx_metadata/my_pipeline/metadata.db" connection_mode: READWRITE_OPENCREATE } INFO:absl:Component CsvExampleGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen" } id: "CsvExampleGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2024-04-30T10:28:46.802790" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } } outputs { outputs { key: "examples" value { artifact_spec { type { name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET } } } } } parameters { parameters { key: "input_base" value { field_value { string_value: "/home/kbuilder/imported/my_pipeline/data" } } } parameters { key: "input_config" value { field_value { string_value: "{\n \"splits\": [\n {\n \"name\": \"single_split\",\n \"pattern\": \"*\"\n }\n ]\n}" } } } parameters { key: "output_config" value { field_value { string_value: "{\n \"split_config\": {\n \"splits\": [\n {\n \"hash_buckets\": 2,\n \"name\": \"train\"\n },\n {\n \"hash_buckets\": 1,\n \"name\": \"eval\"\n }\n ]\n }\n}" } } } parameters { key: "output_data_format" value { field_value { int_value: 6 } } } parameters { key: "output_file_format" value { field_value { int_value: 5 } } } } downstream_nodes: "StatisticsGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized INFO:absl:[CsvExampleGen] Resolved inputs: ({},) INFO:absl:select span and version = (0, None) INFO:absl:latest span and version = (0, None) INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 13 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=13, input_dict={}, output_dict=defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/13" custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1714472788,sum_checksum:1714472788" } } custom_properties { key: "span" value { int_value: 0 } } , artifact_type: name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}), exec_properties={'output_file_format': 5, 'output_data_format': 6, 'input_base': '/home/kbuilder/imported/my_pipeline/data', 'input_config': '{\n "splits": [\n {\n "name": "single_split",\n "pattern": "*"\n }\n ]\n}', 'output_config': '{\n "split_config": {\n "splits": [\n {\n "hash_buckets": 2,\n "name": "train"\n },\n {\n "hash_buckets": 1,\n "name": "eval"\n }\n ]\n }\n}', 'span': 0, 'version': None, 'input_fingerprint': 'split:single_split,num_files:1,total_bytes:25648,xor_checksum:1714472788,sum_checksum:1714472788'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/13/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/stateful_working_dir/2024e0f9-b975-4dec-b7f4-6c9f3a6aee64', tmp_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/13/.temp/', pipeline_node=node_info { type { name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen" } id: "CsvExampleGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2024-04-30T10:28:46.802790" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } } outputs { outputs { key: "examples" value { artifact_spec { type { name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET } } } } } parameters { parameters { key: "input_base" value { field_value { string_value: "/home/kbuilder/imported/my_pipeline/data" } } } parameters { key: "input_config" value { field_value { string_value: "{\n \"splits\": [\n {\n \"name\": \"single_split\",\n \"pattern\": \"*\"\n }\n ]\n}" } } } parameters { key: "output_config" value { field_value { string_value: "{\n \"split_config\": {\n \"splits\": [\n {\n \"hash_buckets\": 2,\n \"name\": \"train\"\n },\n {\n \"hash_buckets\": 1,\n \"name\": \"eval\"\n }\n ]\n }\n}" } } } parameters { key: "output_data_format" value { field_value { int_value: 6 } } } parameters { key: "output_file_format" value { field_value { int_value: 5 } } } } downstream_nodes: "StatisticsGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2024-04-30T10:28:46.802790', top_level_pipeline_run_id=None, frontend_url=None) INFO:absl:Generating examples. INFO:absl:Processing input csv data /home/kbuilder/imported/my_pipeline/data/* to TFExample. WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be. INFO:absl:Examples generated. INFO:absl:Value type <class 'NoneType'> of key version in exec_properties is not supported, going to drop it INFO:absl:Value type <class 'list'> of key _beam_pipeline_args in exec_properties is not supported, going to drop it INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 13 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Deleted stateful_working_dir ./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/stateful_working_dir/2024e0f9-b975-4dec-b7f4-6c9f3a6aee64 INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/13" custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1714472788,sum_checksum:1714472788" } } custom_properties { key: "span" value { int_value: 0 } } , artifact_type: name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}) for execution 13 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component CsvExampleGen is finished. INFO:absl:Component StatisticsGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.statistics_gen.component.StatisticsGen" base_type: PROCESS } id: "StatisticsGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2024-04-30T10:28:46.802790" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } } inputs { inputs { key: "examples" value { channels { producer_node_query { id: "CsvExampleGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2024-04-30T10:28:46.802790" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } artifact_query { type { name: "Examples" base_type: DATASET } } output_key: "examples" } min_count: 1 } } } outputs { outputs { key: "statistics" value { artifact_spec { type { name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } } upstream_nodes: "CsvExampleGen" downstream_nodes: "SchemaGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized WARNING:absl:ArtifactQuery.property_predicate is not supported. INFO:absl:[StatisticsGen] Resolved inputs: ({'examples': [Artifact(artifact: id: 13 type_id: 15 uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/13" properties { key: "split_names" value { string_value: "[\"train\", \"eval\"]" } } custom_properties { key: "file_format" value { string_value: "tfrecords_gzip" } } custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1714472788,sum_checksum:1714472788" } } custom_properties { key: "is_external" value { int_value: 0 } } custom_properties { key: "payload_format" value { string_value: "FORMAT_TF_EXAMPLE" } } custom_properties { key: "span" value { int_value: 0 } } custom_properties { key: "tfx_version" value { string_value: "1.15.0" } } state: LIVE type: "Examples" create_time_since_epoch: 1714472927698 last_update_time_since_epoch: 1714472927698 , artifact_type: id: 15 name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]},) INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 14 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=14, input_dict={'examples': [Artifact(artifact: id: 13 type_id: 15 uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/13" properties { key: "split_names" value { string_value: "[\"train\", \"eval\"]" } } custom_properties { key: "file_format" value { string_value: "tfrecords_gzip" } } custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1714472788,sum_checksum:1714472788" } } custom_properties { key: "is_external" value { int_value: 0 } } custom_properties { key: "payload_format" value { string_value: "FORMAT_TF_EXAMPLE" } } custom_properties { key: "span" value { int_value: 0 } } custom_properties { key: "tfx_version" value { string_value: "1.15.0" } } state: LIVE type: "Examples" create_time_since_epoch: 1714472927698 last_update_time_since_epoch: 1714472927698 , artifact_type: id: 15 name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}, output_dict=defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/14" , artifact_type: name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}), exec_properties={'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/14/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/stateful_working_dir/96b2280e-a17b-4c15-9885-d85426ca7ba2', tmp_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/14/.temp/', pipeline_node=node_info { type { name: "tfx.components.statistics_gen.component.StatisticsGen" base_type: PROCESS } id: "StatisticsGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2024-04-30T10:28:46.802790" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } } inputs { inputs { key: "examples" value { channels { producer_node_query { id: "CsvExampleGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2024-04-30T10:28:46.802790" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } artifact_query { type { name: "Examples" base_type: DATASET } } output_key: "examples" } min_count: 1 } } } outputs { outputs { key: "statistics" value { artifact_spec { type { name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } } upstream_nodes: "CsvExampleGen" downstream_nodes: "SchemaGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2024-04-30T10:28:46.802790', top_level_pipeline_run_id=None, frontend_url=None) INFO:absl:Generating statistics for split train. INFO:absl:Statistics for split train written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/14/Split-train. INFO:absl:Generating statistics for split eval. INFO:absl:Statistics for split eval written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/14/Split-eval. INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 14 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Deleted stateful_working_dir ./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/stateful_working_dir/96b2280e-a17b-4c15-9885-d85426ca7ba2 INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/14" , artifact_type: name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}) for execution 14 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component StatisticsGen is finished. INFO:absl:Component SchemaGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.schema_gen.component.SchemaGen" base_type: PROCESS } id: "SchemaGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2024-04-30T10:28:46.802790" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.SchemaGen" } } } } inputs { inputs { key: "statistics" value { channels { producer_node_query { id: "StatisticsGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2024-04-30T10:28:46.802790" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } artifact_query { type { name: "ExampleStatistics" base_type: STATISTICS } } output_key: "statistics" } min_count: 1 } } } outputs { outputs { key: "schema" value { artifact_spec { type { name: "Schema" } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } parameters { key: "infer_feature_shape" value { field_value { int_value: 1 } } } } upstream_nodes: "StatisticsGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized WARNING:absl:ArtifactQuery.property_predicate is not supported. INFO:absl:[SchemaGen] Resolved inputs: ({'statistics': [Artifact(artifact: id: 14 type_id: 17 uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/14" properties { key: "span" value { int_value: 0 } } properties { key: "split_names" value { string_value: "[\"train\", \"eval\"]" } } custom_properties { key: "is_external" value { int_value: 0 } } custom_properties { key: "stats_dashboard_link" value { string_value: "" } } custom_properties { key: "tfx_version" value { string_value: "1.15.0" } } state: LIVE type: "ExampleStatistics" create_time_since_epoch: 1714472930400 last_update_time_since_epoch: 1714472930400 , artifact_type: id: 17 name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]},) INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 15 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=15, input_dict={'statistics': [Artifact(artifact: id: 14 type_id: 17 uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/14" properties { key: "span" value { int_value: 0 } } properties { key: "split_names" value { string_value: "[\"train\", \"eval\"]" } } custom_properties { key: "is_external" value { int_value: 0 } } custom_properties { key: "stats_dashboard_link" value { string_value: "" } } custom_properties { key: "tfx_version" value { string_value: "1.15.0" } } state: LIVE type: "ExampleStatistics" create_time_since_epoch: 1714472930400 last_update_time_since_epoch: 1714472930400 , artifact_type: id: 17 name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}, output_dict=defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/15" , artifact_type: name: "Schema" )]}), exec_properties={'infer_feature_shape': 1, 'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/15/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/stateful_working_dir/717ce417-3db8-463d-adbe-f6ef853635de', tmp_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/15/.temp/', pipeline_node=node_info { type { name: "tfx.components.schema_gen.component.SchemaGen" base_type: PROCESS } id: "SchemaGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2024-04-30T10:28:46.802790" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.SchemaGen" } } } } inputs { inputs { key: "statistics" value { channels { producer_node_query { id: "StatisticsGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2024-04-30T10:28:46.802790" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } artifact_query { type { name: "ExampleStatistics" base_type: STATISTICS } } output_key: "statistics" } min_count: 1 } } } outputs { outputs { key: "schema" value { artifact_spec { type { name: "Schema" } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } parameters { key: "infer_feature_shape" value { field_value { int_value: 1 } } } } upstream_nodes: "StatisticsGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2024-04-30T10:28:46.802790', top_level_pipeline_run_id=None, frontend_url=None) INFO:absl:Processing schema from statistics for split train. INFO:absl:Processing schema from statistics for split eval. INFO:absl:Schema written to ./tfx_pipeline_output/my_pipeline/SchemaGen/schema/15/schema.pbtxt. INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 15 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Deleted stateful_working_dir ./tfx_pipeline_output/my_pipeline/SchemaGen/.system/stateful_working_dir/717ce417-3db8-463d-adbe-f6ef853635de INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/15" , artifact_type: name: "Schema" )]}) for execution 15 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component SchemaGen is finished.
You should be able to find your new model at SERVING_MODEL_DIR
.
Step 6. (Optional) Deploy your pipeline to Kubeflow Pipelines on GCP.
As mentioned earlier, local_runner.py
is good for debugging or development
purpose but not a best solution for production workloads. In this step, we will
deploy the pipeline to Kubeflow Pipelines on Google Cloud.
Preparation
We need kfp
python package and skaffold
program to deploy a pipeline to a
Kubeflow Pipelines cluster.
pip install --upgrade -q kfp
# Download skaffold and set it executable.
curl -Lo skaffold https://storage.googleapis.com/skaffold/releases/latest/skaffold-linux-amd64 && chmod +x skaffold
You need to move skaffold
binary to the place where your shell can find it.
Or you can specify the path to skaffold when you run tfx
binary with
--skaffold-cmd
flag.
# Move skaffold binary into your path
mv skaffold /home/jupyter/.local/bin/
mv: cannot move 'skaffold' to '/home/jupyter/.local/bin/': No such file or directory
You also need a Kubeflow Pipelines cluster to run the pipeline. Please follow Step 1 and 2 in TFX on Cloud AI Platform Pipelines tutorial.
When your cluster is ready, open the pipeline dashboard by clicking
Open Pipelines Dashboard in the
Pipelines
page of the Google cloud console.
The URL of this page is ENDPOINT
to request a pipeline run. The endpoint
value is everything in the URL after the https://, up to, and including,
googleusercontent.com. Put your endpoint to following code block.
ENDPOINT='' # Enter your ENDPOINT here.
To run our code in a Kubeflow Pipelines cluster, we need to pack our code into
a container image. The image will be built automatically while deploying our
pipeline, and you only need to set a name and an container registry for your
image. In our example, we will use
Google Container registry,
and name it tfx-pipeline
.
# Read GCP project id from env.
shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
GOOGLE_CLOUD_PROJECT=shell_output[0]
# Docker image name for the pipeline image.
CUSTOM_TFX_IMAGE='gcr.io/' + GOOGLE_CLOUD_PROJECT + '/tfx-pipeline'
Set data location.
Your data should be accessible from the Kubeflow Pipelines cluster. If you have used data in your local environment, you might need to upload it to remote storage like Google Cloud Storage. For example, we can upload penguin data to a default bucket which is created automatically when a Kubeflow Pipelines cluster is deployed like following.
gsutil cp data/data.csv gs://{GOOGLE_CLOUD_PROJECT}-kubeflowpipelines-default/tfx-template/data/penguin/
Copying file://data/data.csv [Content-Type=text/csv]... NotFoundException: 404 The destination bucket gs://tensorflow-testing-kubeflowpipelines-default does not exist or the write to the destination must be restarted
Update the data location stored at DATA_PATH
in kubeflow_runner.py
.
If you are using BigQueryExampleGen, there is no need to upload the data file,
but please make sure that kubeflow_runner.py
uses the same query
and
beam_pipeline_args
argument for pipeline.create_pipeline()
function.
Deploy the pipeline.
If everything is ready, you can create a pipeline using tfx pipeline create
command.
!tfx pipeline create \
--engine=kubeflow \
--pipeline-path=kubeflow_runner.py \
--endpoint={ENDPOINT} \
--build-target-image={CUSTOM_TFX_IMAGE}
2024-04-30 10:29:03.154964: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered 2024-04-30 10:29:03.155012: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered 2024-04-30 10:29:03.156373: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered CLI [Error] --build-target-image flag was DELETED. You should specify the build target image at the `KubeflowDagRunnerConfig` class instead, and use --build-image flag without argument to build a container image when creating or updating a pipeline.
Now start an execution run with the newly created pipeline using the
tfx run create
command.
tfx run create --engine=kubeflow --pipeline-name={PIPELINE_NAME} --endpoint={ENDPOINT}
2024-04-30 10:29:07.281061: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered 2024-04-30 10:29:07.281111: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered 2024-04-30 10:29:07.282477: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered CLI Creating a run for pipeline: my_pipeline Traceback (most recent call last): File "/tmpfs/src/tf_docs_env/bin/tfx", line 8, in <module> sys.exit(cli_group()) File "/tmpfs/src/tf_docs_env/lib/python3.9/site-packages/click/core.py", line 1157, in __call__ return self.main(*args, **kwargs) File "/tmpfs/src/tf_docs_env/lib/python3.9/site-packages/click/core.py", line 1078, in main rv = self.invoke(ctx) File "/tmpfs/src/tf_docs_env/lib/python3.9/site-packages/click/core.py", line 1688, in invoke return _process_result(sub_ctx.command.invoke(sub_ctx)) File "/tmpfs/src/tf_docs_env/lib/python3.9/site-packages/click/core.py", line 1688, in invoke return _process_result(sub_ctx.command.invoke(sub_ctx)) File "/tmpfs/src/tf_docs_env/lib/python3.9/site-packages/click/core.py", line 1434, in invoke return ctx.invoke(self.callback, **ctx.params) File "/tmpfs/src/tf_docs_env/lib/python3.9/site-packages/click/core.py", line 783, in invoke return __callback(*args, **kwargs) File "/tmpfs/src/tf_docs_env/lib/python3.9/site-packages/click/decorators.py", line 92, in new_func return ctx.invoke(f, obj, *args, **kwargs) File "/tmpfs/src/tf_docs_env/lib/python3.9/site-packages/click/core.py", line 783, in invoke return __callback(*args, **kwargs) File "/tmpfs/src/tf_docs_env/lib/python3.9/site-packages/tfx/tools/cli/commands/run.py", line 94, in create_run handler = handler_factory.create_handler(ctx.flags_dict) File "/tmpfs/src/tf_docs_env/lib/python3.9/site-packages/tfx/tools/cli/handler/handler_factory.py", line 92, in create_handler from tfx.tools.cli.handler import kubeflow_handler # pylint: disable=g-import-not-at-top File "/tmpfs/src/tf_docs_env/lib/python3.9/site-packages/tfx/tools/cli/handler/kubeflow_handler.py", line 26, in <module> from tfx.orchestration.kubeflow import kubeflow_dag_runner File "/tmpfs/src/tf_docs_env/lib/python3.9/site-packages/tfx/orchestration/kubeflow/kubeflow_dag_runner.py", line 24, in <module> from kfp import gcp ImportError: cannot import name 'gcp' from 'kfp' (/tmpfs/src/tf_docs_env/lib/python3.9/site-packages/kfp/__init__.py)
Or, you can also run the pipeline in the Kubeflow Pipelines dashboard. The new
run will be listed under Experiments
in the Kubeflow Pipelines dashboard.
Clicking into the experiment will allow you to monitor progress and visualize
the artifacts created during the execution run.
If you are interested in running your pipeline on Kubeflow Pipelines, find more instructions in TFX on Cloud AI Platform Pipelines tutorial.
Cleaning up
To clean up all Google Cloud resources used in this step, you can delete the Google Cloud project you used for the tutorial.
Alternatively, you can clean up individual resources by visiting each consoles: