Создайте конвейер TFX для ваших данных с помощью шаблона Penguin


Этот документ содержит инструкции по созданию конвейера TensorFlow Extended (TFX) для вашего собственного набора данных с использованием шаблона Penguin, который предоставляется с пакетом TFX Python. Созданный конвейер изначально будет использовать набор данных Palmer Penguins , но мы преобразуем конвейер для вашего набора данных.


  • Линукс / МакОС
  • Питон 3.6-3.8
  • блокнот Юпитер

Шаг 1. Скопируйте предопределенный шаблон в каталог вашего проекта.

На этом этапе мы создадим рабочий каталог и файлы проекта конвейера, скопировав файлы из шаблона пингвина в TFX. Вы можете думать об этом как о каркасе для вашего проекта конвейера TFX.

Обновить пункт

Если мы работаем в Colab, мы должны убедиться, что у нас установлена ​​последняя версия Pip. Локальные системы, конечно, могут быть обновлены отдельно.

import sys
if 'google.colab' in sys.modules:
  !pip install --upgrade pip

Установить необходимый пакет

Сначала установите TFX и анализ моделей TensorFlow (TFMA).

pip install -U tfx tensorflow-model-analysis

Давайте проверим версии TFX.

import tensorflow as tf
import tensorflow_model_analysis as tfma
import tfx

print('TF version: {}'.format(tf.__version__))
print('TFMA version: {}'.format(tfma.__version__))
print('TFX version: {}'.format(tfx.__version__))
TF version: 2.7.1
TFMA version: 0.37.0
TFX version: 1.6.0

Мы готовы создать конвейер.

Установите PROJECT_DIR в соответствующее место назначения для вашей среды. Значение по умолчанию — ~/imported/${PIPELINE_NAME} , которое подходит для среды ноутбуков Google Cloud AI Platform .

Вы можете дать своему конвейеру другое имя, изменив PIPELINE_NAME ниже. Это также станет именем каталога проекта, в который будут помещены ваши файлы.

import os
# Set this project directory to your new tfx pipeline project.
PROJECT_DIR=os.path.join(os.path.expanduser("~"), "imported", PIPELINE_NAME)

Скопируйте файлы шаблонов.

TFX включает шаблон penguin с пакетом python TFX. шаблон penguin содержит множество инструкций по переносу вашего набора данных в конвейер, что и является целью этого руководства.

Команда интерфейса командной строки tfx template copy копирует предварительно определенные файлы шаблонов в каталог вашего проекта.

# Set `PATH` to include user python binary directory and a directory containing `skaffold`.
%env PATH={PATH}:/home/jupyter/.local/bin

!tfx template copy \
  --pipeline-name={PIPELINE_NAME} \
  --destination-path={PROJECT_DIR} \
env: PATH=/tmpfs/src/tf_docs_env/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/snap/bin:/opt/puppetlabs/bin:/opt/android-studio/current/bin:/usr/local/go/bin:/usr/local/go/packages/bin:/opt/kubernetes/client/bin/:/home/kbuilder/.local/bin:/home/jupyter/.local/bin
Copying penguin pipeline template
kubeflow_runner.py -> /home/kbuilder/imported/my_pipeline/kubeflow_runner.py
configs.py -> /home/kbuilder/imported/my_pipeline/pipeline/configs.py
pipeline.py -> /home/kbuilder/imported/my_pipeline/pipeline/pipeline.py
__init__.py -> /home/kbuilder/imported/my_pipeline/pipeline/__init__.py
model.py -> /home/kbuilder/imported/my_pipeline/models/model.py
features.py -> /home/kbuilder/imported/my_pipeline/models/features.py
features_test.py -> /home/kbuilder/imported/my_pipeline/models/features_test.py
preprocessing_test.py -> /home/kbuilder/imported/my_pipeline/models/preprocessing_test.py
preprocessing.py -> /home/kbuilder/imported/my_pipeline/models/preprocessing.py
model_test.py -> /home/kbuilder/imported/my_pipeline/models/model_test.py
__init__.py -> /home/kbuilder/imported/my_pipeline/models/__init__.py
constants.py -> /home/kbuilder/imported/my_pipeline/models/constants.py
local_runner.py -> /home/kbuilder/imported/my_pipeline/local_runner.py
__init__.py -> /home/kbuilder/imported/my_pipeline/__init__.py

Измените контекст рабочего каталога в этой записной книжке на каталог проекта.


Просмотрите скопированные исходные файлы

Шаблон TFX предоставляет основные файлы шаблонов для построения конвейера, включая исходный код Python и образцы данных. Шаблон penguin использует тот же набор данных Palmer Penguins и модель машинного обучения, что и пример Penguin .

Вот краткое введение в каждый из файлов Python.

  • pipeline — этот каталог содержит определение конвейера
    • configs.py — определяет общие константы для запуска конвейера.
    • pipeline.py — определяет компоненты TFX и конвейер
  • models — этот каталог содержит определения моделей машинного обучения.
    • features.py , features_test.py — определяет функции для модели
    • preprocessing.py , preprocessing_test.py — определяет процедуры предварительной обработки данных.
    • constants.py — определяет константы модели
    • model.py , model_test.py — определяет модель машинного обучения с использованием фреймворков машинного обучения, таких как TensorFlow.
  • local_runner.py — определяет бегун для локальной среды, который использует локальный механизм оркестровки.
  • kubeflow_runner.py — определение исполнителя для механизма оркестрации Kubeflow Pipelines.

По умолчанию шаблон включает только стандартные компоненты TFX. Если вам нужны какие-то настраиваемые действия, вы можете создать настраиваемые компоненты для своего конвейера. Подробности см. в руководстве по пользовательским компонентам TFX .

Файлы юнит-тестов.

Вы могли заметить, что в имени некоторых файлов _test.py . Это модульные тесты конвейера, и рекомендуется добавлять дополнительные модульные тесты по мере реализации собственных конвейеров. Вы можете запускать модульные тесты, указав имя модуля тестовых файлов с флагом -m . Обычно вы можете получить имя модуля, удалив расширение .py и заменив / на . . Например:

import sys
!{sys.executable} -m models.features_test
Running tests under Python 3.7.5: /tmpfs/src/tf_docs_env/bin/python
[ RUN      ] FeaturesTest.testLabelKey
INFO:tensorflow:time(__main__.FeaturesTest.testLabelKey): 0.0s
I0203 11:08:46.306882 140258321348416 test_util.py:2309] time(__main__.FeaturesTest.testLabelKey): 0.0s
[       OK ] FeaturesTest.testLabelKey
[ RUN      ] FeaturesTest.test_session
[  SKIPPED ] FeaturesTest.test_session
Ran 2 tests in 0.001s

OK (skipped=1)

Создайте конвейер TFX в локальной среде.

TFX поддерживает несколько механизмов оркестровки для запуска конвейеров. Мы будем использовать локальный механизм оркестровки. Механизм локальной оркестровки работает без каких-либо дополнительных зависимостей и подходит для разработки и отладки, поскольку работает в локальной среде, а не зависит от удаленных вычислительных кластеров.

Мы будем использовать local_runner.py для запуска вашего конвейера с помощью локального оркестратора. Вы должны создать конвейер перед его запуском. Вы можете создать конвейер с помощью команды pipeline create .

tfx pipeline create --engine=local --pipeline_path=local_runner.py
Creating pipeline
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
Pipeline "my_pipeline" created successfully.

Команда pipeline create регистрирует ваш конвейер, определенный в local_runner.py , фактически не запуская его.

Вы запустите созданный конвейер с помощью команды run create в следующих шагах.

Шаг 2. Загрузите ВАШИ данные в конвейер.

Начальный конвейер принимает набор данных о пингвинах, включенный в шаблон. Вам нужно поместить свои данные в конвейер, и большинство конвейеров TFX начинаются с компонента ExampleGen.

Выберите ExampleGen

Ваши данные могут храниться в любом месте, к которому может получить доступ ваш конвейер, либо в локальной, либо в распределенной файловой системе, либо в системе с поддержкой запросов. TFX предоставляет различные компоненты ExampleGen для переноса ваших данных в конвейер TFX. Вы можете выбрать один из следующих примеров создания компонентов.

Вы также можете создать свой собственный ExampleGen, например, tfx включает собственный ExecampleGen, который использует Presto в качестве источника данных. См . руководство для получения дополнительной информации о том, как использовать и разрабатывать собственные исполнители.

Как только вы решите, какой ExampleGen использовать, вам нужно будет изменить определение конвейера, чтобы использовать ваши данные.

  1. Измените DATA_PATH в local_runner.py и задайте для него расположение ваших файлов.

    • Если у вас есть файлы в локальной среде, укажите путь. Это лучший вариант для разработки или отладки конвейера.
    • Если файлы хранятся в GCS, вы можете использовать путь, начинающийся с gs://{bucket_name}/... . Убедитесь, что вы можете получить доступ к GCS со своего терминала, например, с помощью gsutil . При необходимости следуйте инструкциям по авторизации в Google Cloud .
    • Если вы хотите использовать ExampleGen на основе запроса, например BigQueryExampleGen, вам потребуется оператор Query для выбора данных из источника данных. Есть еще несколько вещей, которые вам нужно настроить, чтобы использовать Google Cloud BigQuery в качестве источника данных.
    • В pipeline/configs.py :
      • Измените GOOGLE_CLOUD_PROJECT и GCS_BUCKET_NAME на свой проект GCP и имя корзины. Ведро должно существовать до того, как мы запустим конвейер.
      • Раскомментируйте и установите переменную BIG_QUERY_QUERY в оператор запроса .
    • В local_runner.py :
      • Вместо этого закомментируйте аргумент data_path и раскомментируйте аргумент query в pipe.create_pipeline pipeline.create_pipeline() .
    • В pipeline/pipeline.py :
      • Закомментируйте аргумент data_path и раскомментируйте аргумент query в create_pipeline() .
      • Используйте BigQueryExampleGen вместо CsvExampleGen.
  2. Замените существующий CsvExampleGen своим классом ExampleGen в pipeline/pipeline.py . Каждый класс ExampleGen имеет разную подпись. Пожалуйста, смотрите руководство по компоненту ExampleGen для более подробной информации. Не забудьте импортировать необходимые модули с операторами import в pipeline/pipeline.py .

Начальный конвейер состоит из четырех компонентов: ExampleGen , StatisticsGen , SchemaGen и ExampleValidator . Нам не нужно ничего менять для StatisticsGen , SchemaGen и ExampleValidator . Давайте запустим конвейер в первый раз.

# Update and run the pipeline.
!tfx pipeline update --engine=local --pipeline_path=local_runner.py \
 && tfx run create --engine=local --pipeline_name={PIPELINE_NAME}
Updating pipeline
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
Pipeline "my_pipeline" updated successfully.
Creating a run for pipeline: my_pipeline
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Using deployment config:
 executor_specs {
  key: "CsvExampleGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor"
executor_specs {
  key: "SchemaGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.schema_gen.executor.Executor"
executor_specs {
  key: "StatisticsGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.statistics_gen.executor.Executor"
custom_driver_specs {
  key: "CsvExampleGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.example_gen.driver.FileBasedDriver"
metadata_connection_config {
  database_connection_config {
    sqlite {
      filename_uri: "./tfx_metadata/my_pipeline/metadata.db"
      connection_mode: READWRITE_OPENCREATE

INFO:absl:Using connection config:
 sqlite {
  filename_uri: "./tfx_metadata/my_pipeline/metadata.db"
  connection_mode: READWRITE_OPENCREATE

INFO:absl:Component CsvExampleGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
  id: "CsvExampleGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:09:12.120566"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.CsvExampleGen"
outputs {
  outputs {
    key: "examples"
    value {
      artifact_spec {
        type {
          name: "Examples"
          properties {
            key: "span"
            value: INT
          properties {
            key: "split_names"
            value: STRING
          properties {
            key: "version"
            value: INT
          base_type: DATASET
parameters {
  parameters {
    key: "input_base"
    value {
      field_value {
        string_value: "/home/kbuilder/imported/my_pipeline/data"
  parameters {
    key: "input_config"
    value {
      field_value {
        string_value: "{\n  \"splits\": [\n    {\n      \"name\": \"single_split\",\n      \"pattern\": \"*\"\n    }\n  ]\n}"
  parameters {
    key: "output_config"
    value {
      field_value {
        string_value: "{\n  \"split_config\": {\n    \"splits\": [\n      {\n        \"hash_buckets\": 2,\n        \"name\": \"train\"\n      },\n      {\n        \"hash_buckets\": 1,\n        \"name\": \"eval\"\n      }\n    ]\n  }\n}"
  parameters {
    key: "output_data_format"
    value {
      field_value {
        int_value: 6
  parameters {
    key: "output_file_format"
    value {
      field_value {
        int_value: 5
downstream_nodes: "StatisticsGen"
execution_options {
  caching_options {

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:select span and version = (0, None)
INFO:absl:latest span and version = (0, None)
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 1
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=1, input_dict={}, output_dict=defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/1"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:12.120566:CsvExampleGen:examples:0"
custom_properties {
  key: "span"
  value {
    int_value: 0
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
properties {
  key: "version"
  value: INT
base_type: DATASET
)]}), exec_properties={'input_config': '{\n  "splits": [\n    {\n      "name": "single_split",\n      "pattern": "*"\n    }\n  ]\n}', 'output_data_format': 6, 'output_file_format': 5, 'output_config': '{\n  "split_config": {\n    "splits": [\n      {\n        "hash_buckets": 2,\n        "name": "train"\n      },\n      {\n        "hash_buckets": 1,\n        "name": "eval"\n      }\n    ]\n  }\n}', 'input_base': '/home/kbuilder/imported/my_pipeline/data', 'span': 0, 'version': None, 'input_fingerprint': 'split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/1/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/stateful_working_dir/2022-02-03T11:09:12.120566', tmp_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/1/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
  id: "CsvExampleGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:09:12.120566"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.CsvExampleGen"
outputs {
  outputs {
    key: "examples"
    value {
      artifact_spec {
        type {
          name: "Examples"
          properties {
            key: "span"
            value: INT
          properties {
            key: "split_names"
            value: STRING
          properties {
            key: "version"
            value: INT
          base_type: DATASET
parameters {
  parameters {
    key: "input_base"
    value {
      field_value {
        string_value: "/home/kbuilder/imported/my_pipeline/data"
  parameters {
    key: "input_config"
    value {
      field_value {
        string_value: "{\n  \"splits\": [\n    {\n      \"name\": \"single_split\",\n      \"pattern\": \"*\"\n    }\n  ]\n}"
  parameters {
    key: "output_config"
    value {
      field_value {
        string_value: "{\n  \"split_config\": {\n    \"splits\": [\n      {\n        \"hash_buckets\": 2,\n        \"name\": \"train\"\n      },\n      {\n        \"hash_buckets\": 1,\n        \"name\": \"eval\"\n      }\n    ]\n  }\n}"
  parameters {
    key: "output_data_format"
    value {
      field_value {
        int_value: 6
  parameters {
    key: "output_file_format"
    value {
      field_value {
        int_value: 5
downstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2022-02-03T11:09:12.120566')
INFO:absl:Generating examples.
INFO:absl:Processing input csv data /home/kbuilder/imported/my_pipeline/data/* to TFExample.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
E0203 11:09:12.848598153    5127 fork_posix.cc:70]           Fork support is only compatible with the epoll1 and poll polling strategies
WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be.
INFO:absl:Examples generated.
INFO:absl:Value type <class 'NoneType'> of key version in exec_properties is not supported, going to drop it
INFO:absl:Value type <class 'list'> of key _beam_pipeline_args in exec_properties is not supported, going to drop it
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 1 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/1"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:12.120566:CsvExampleGen:examples:0"
custom_properties {
  key: "span"
  value {
    int_value: 0
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
properties {
  key: "version"
  value: INT
base_type: DATASET
)]}) for execution 1
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component CsvExampleGen is finished.
INFO:absl:Component StatisticsGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.statistics_gen.component.StatisticsGen"
    base_type: PROCESS
  id: "StatisticsGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:09:12.120566"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.StatisticsGen"
inputs {
  inputs {
    key: "examples"
    value {
      channels {
        producer_node_query {
          id: "CsvExampleGen"
        context_queries {
          type {
            name: "pipeline"
          name {
            field_value {
              string_value: "my_pipeline"
        context_queries {
          type {
            name: "pipeline_run"
          name {
            field_value {
              string_value: "2022-02-03T11:09:12.120566"
        context_queries {
          type {
            name: "node"
          name {
            field_value {
              string_value: "my_pipeline.CsvExampleGen"
        artifact_query {
          type {
            name: "Examples"
            base_type: DATASET
        output_key: "examples"
      min_count: 1
outputs {
  outputs {
    key: "statistics"
    value {
      artifact_spec {
        type {
          name: "ExampleStatistics"
          properties {
            key: "span"
            value: INT
          properties {
            key: "split_names"
            value: STRING
          base_type: STATISTICS
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
upstream_nodes: "CsvExampleGen"
downstream_nodes: "SchemaGen"
execution_options {
  caching_options {

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 2
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=2, input_dict={'examples': [Artifact(artifact: id: 1
type_id: 15
uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/1"
properties {
  key: "split_names"
  value {
    string_value: "[\"train\", \"eval\"]"
custom_properties {
  key: "file_format"
  value {
    string_value: "tfrecords_gzip"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:12.120566:CsvExampleGen:examples:0"
custom_properties {
  key: "payload_format"
  value {
    string_value: "FORMAT_TF_EXAMPLE"
custom_properties {
  key: "span"
  value {
    int_value: 0
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
state: LIVE
create_time_since_epoch: 1643886553302
last_update_time_since_epoch: 1643886553302
, artifact_type: id: 15
name: "Examples"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
properties {
  key: "version"
  value: INT
base_type: DATASET
)]}, output_dict=defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/2"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:12.120566:StatisticsGen:statistics:0"
, artifact_type: name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
base_type: STATISTICS
)]}), exec_properties={'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/2/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/stateful_working_dir/2022-02-03T11:09:12.120566', tmp_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/2/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.statistics_gen.component.StatisticsGen"
    base_type: PROCESS
  id: "StatisticsGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:09:12.120566"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.StatisticsGen"
inputs {
  inputs {
    key: "examples"
    value {
      channels {
        producer_node_query {
          id: "CsvExampleGen"
        context_queries {
          type {
            name: "pipeline"
          name {
            field_value {
              string_value: "my_pipeline"
        context_queries {
          type {
            name: "pipeline_run"
          name {
            field_value {
              string_value: "2022-02-03T11:09:12.120566"
        context_queries {
          type {
            name: "node"
          name {
            field_value {
              string_value: "my_pipeline.CsvExampleGen"
        artifact_query {
          type {
            name: "Examples"
            base_type: DATASET
        output_key: "examples"
      min_count: 1
outputs {
  outputs {
    key: "statistics"
    value {
      artifact_spec {
        type {
          name: "ExampleStatistics"
          properties {
            key: "span"
            value: INT
          properties {
            key: "split_names"
            value: STRING
          base_type: STATISTICS
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
upstream_nodes: "CsvExampleGen"
downstream_nodes: "SchemaGen"
execution_options {
  caching_options {
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2022-02-03T11:09:12.120566')
INFO:absl:Generating statistics for split train.
INFO:absl:Statistics for split train written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/2/Split-train.
INFO:absl:Generating statistics for split eval.
INFO:absl:Statistics for split eval written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/2/Split-eval.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 2 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/2"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:12.120566:StatisticsGen:statistics:0"
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
, artifact_type: name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
base_type: STATISTICS
)]}) for execution 2
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component StatisticsGen is finished.
INFO:absl:Component SchemaGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.schema_gen.component.SchemaGen"
    base_type: PROCESS
  id: "SchemaGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:09:12.120566"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.SchemaGen"
inputs {
  inputs {
    key: "statistics"
    value {
      channels {
        producer_node_query {
          id: "StatisticsGen"
        context_queries {
          type {
            name: "pipeline"
          name {
            field_value {
              string_value: "my_pipeline"
        context_queries {
          type {
            name: "pipeline_run"
          name {
            field_value {
              string_value: "2022-02-03T11:09:12.120566"
        context_queries {
          type {
            name: "node"
          name {
            field_value {
              string_value: "my_pipeline.StatisticsGen"
        artifact_query {
          type {
            name: "ExampleStatistics"
            base_type: STATISTICS
        output_key: "statistics"
      min_count: 1
outputs {
  outputs {
    key: "schema"
    value {
      artifact_spec {
        type {
          name: "Schema"
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
  parameters {
    key: "infer_feature_shape"
    value {
      field_value {
        int_value: 1
upstream_nodes: "StatisticsGen"
execution_options {
  caching_options {

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 3
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=3, input_dict={'statistics': [Artifact(artifact: id: 2
type_id: 17
uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/2"
properties {
  key: "split_names"
  value {
    string_value: "[\"train\", \"eval\"]"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:12.120566:StatisticsGen:statistics:0"
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
state: LIVE
create_time_since_epoch: 1643886556588
last_update_time_since_epoch: 1643886556588
, artifact_type: id: 17
name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
base_type: STATISTICS
)]}, output_dict=defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/3"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:12.120566:SchemaGen:schema:0"
, artifact_type: name: "Schema"
)]}), exec_properties={'infer_feature_shape': 1, 'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/3/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/stateful_working_dir/2022-02-03T11:09:12.120566', tmp_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/3/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.schema_gen.component.SchemaGen"
    base_type: PROCESS
  id: "SchemaGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:09:12.120566"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.SchemaGen"
inputs {
  inputs {
    key: "statistics"
    value {
      channels {
        producer_node_query {
          id: "StatisticsGen"
        context_queries {
          type {
            name: "pipeline"
          name {
            field_value {
              string_value: "my_pipeline"
        context_queries {
          type {
            name: "pipeline_run"
          name {
            field_value {
              string_value: "2022-02-03T11:09:12.120566"
        context_queries {
          type {
            name: "node"
          name {
            field_value {
              string_value: "my_pipeline.StatisticsGen"
        artifact_query {
          type {
            name: "ExampleStatistics"
            base_type: STATISTICS
        output_key: "statistics"
      min_count: 1
outputs {
  outputs {
    key: "schema"
    value {
      artifact_spec {
        type {
          name: "Schema"
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
  parameters {
    key: "infer_feature_shape"
    value {
      field_value {
        int_value: 1
upstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2022-02-03T11:09:12.120566')
INFO:absl:Processing schema from statistics for split train.
INFO:absl:Processing schema from statistics for split eval.
INFO:absl:Schema written to ./tfx_pipeline_output/my_pipeline/SchemaGen/schema/3/schema.pbtxt.
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 3 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/3"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:12.120566:SchemaGen:schema:0"
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
, artifact_type: name: "Schema"
)]}) for execution 3
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component SchemaGen is finished.

Вы должны увидеть «Валидатор примера компонента завершен». если конвейер прошел успешно.

Изучите вывод конвейера.

Конвейер TFX создает два вида выходных данных: артефакты и базу данных метаданных (MLMD) , которая содержит метаданные артефактов и выполнения конвейера. Расположение вывода определяется в local_runner.py . По умолчанию артефакты хранятся в tfx_pipeline_output а метаданные хранятся в виде базы данных sqlite в каталоге tfx_metadata .

Вы можете использовать API-интерфейсы MLMD для проверки этих выходных данных. Во-первых, мы определим некоторые служебные функции для поиска только что созданных выходных артефактов.

import tensorflow as tf
import tfx
from ml_metadata import errors
from ml_metadata.proto import metadata_store_pb2
from tfx.types import artifact_utils

# TODO(b/171447278): Move these functions into TFX library.

def get_latest_executions(store, pipeline_name, component_id = None):
  """Fetch all pipeline runs."""
  if component_id is None:  # Find entire pipeline runs.
    run_contexts = [
        c for c in store.get_contexts_by_type('run')
        if c.properties['pipeline_name'].string_value == pipeline_name
  else:  # Find specific component runs.
    run_contexts = [
        c for c in store.get_contexts_by_type('component_run')
        if c.properties['pipeline_name'].string_value == pipeline_name and
           c.properties['component_id'].string_value == component_id
  if not run_contexts:
    return []
  # Pick the latest run context.
  latest_context = max(run_contexts,
                       key=lambda c: c.last_update_time_since_epoch)
  return store.get_executions_by_context(latest_context.id)

def get_latest_artifacts(store, pipeline_name, component_id = None):
  """Fetch all artifacts from latest pipeline execution."""
  executions = get_latest_executions(store, pipeline_name, component_id)

  # Fetch all artifacts produced from the given executions.
  execution_ids = [e.id for e in executions]
  events = store.get_events_by_execution_ids(execution_ids)
  artifact_ids = [
      event.artifact_id for event in events
      if event.type == metadata_store_pb2.Event.OUTPUT
  return store.get_artifacts_by_id(artifact_ids)

def find_latest_artifacts_by_type(store, artifacts, artifact_type):
  """Get the latest artifacts of a specified type."""
  # Get type information from MLMD
    artifact_type = store.get_artifact_type(artifact_type)
  except errors.NotFoundError:
    return []
  # Filter artifacts with type.
  filtered_artifacts = [aritfact for aritfact in artifacts
                        if aritfact.type_id == artifact_type.id]
  # Convert MLMD artifact data into TFX Artifact instances.
  return [artifact_utils.deserialize_artifact(artifact_type, artifact)
      for artifact in filtered_artifacts]

from tfx.orchestration.experimental.interactive import visualizations

def visualize_artifacts(artifacts):
  """Visualizes artifacts using standard visualization modules."""
  for artifact in artifacts:
    visualization = visualizations.get_registry().get_visualization(
    if visualization:

from tfx.orchestration.experimental.interactive import standard_visualizations

import pprint

from tfx.orchestration import metadata
from tfx.types import artifact_utils
from tfx.types import standard_artifacts

def preview_examples(artifacts):
  """Preview a few records from Examples artifacts."""
  pp = pprint.PrettyPrinter()
  for artifact in artifacts:
    print("==== Examples artifact:{}({})".format(artifact.name, artifact.uri))
    for split in artifact_utils.decode_split_names(artifact.split_names):
      print("==== Reading from split:{}".format(split))
      split_uri = artifact_utils.get_split_uri([artifact], split)

      # Get the list of files in this directory (all compressed TFRecord files)
      tfrecord_filenames = [os.path.join(split_uri, name)
                            for name in os.listdir(split_uri)]
      # Create a `TFRecordDataset` to read these files
      dataset = tf.data.TFRecordDataset(tfrecord_filenames,
      # Iterate over the first 2 records and decode them.
      for tfrecord in dataset.take(2):
        serialized_example = tfrecord.numpy()
        example = tf.train.Example()

import local_runner

metadata_connection_config = metadata.sqlite_metadata_connection_config(

Теперь мы можем читать метаданные выходных артефактов из MLMD.

with metadata.Metadata(metadata_connection_config) as metadata_handler:
    # Search all aritfacts from the previous pipeline run.
    artifacts = get_latest_artifacts(metadata_handler.store, PIPELINE_NAME)
    # Find artifacts of Examples type.
    examples_artifacts = find_latest_artifacts_by_type(
        metadata_handler.store, artifacts,
    # Find artifacts generated from StatisticsGen.
    stats_artifacts = find_latest_artifacts_by_type(
        metadata_handler.store, artifacts,
    # Find artifacts generated from SchemaGen.
    schema_artifacts = find_latest_artifacts_by_type(
        metadata_handler.store, artifacts,
    # Find artifacts generated from ExampleValidator.
    anomalies_artifacts = find_latest_artifacts_by_type(
        metadata_handler.store, artifacts,

Теперь мы можем изучить выходные данные каждого компонента. Проверка данных Tensorflow (TFDV) используется в StatisticsGen , SchemaGen и ExampleValidator , а TFDV можно использовать для визуализации выходных данных этих компонентов.

В этом руководстве мы будем использовать вспомогательные методы визуализации в TFX, которые используют TFDV внутри для отображения визуализации. См. руководство по компонентам TFX, чтобы узнать больше о каждом компоненте.

Изучите форму вывода ExampleGen

Давайте рассмотрим вывод из ExampleGen. Взгляните на первые два примера для каждого разделения:


По умолчанию TFX ExampleGen делит примеры на два сплита, train и eval , но вы можете настроить свою конфигурацию сплита .

Изучите вывод из StatisticsGen


Эти статистические данные предоставляются SchemaGen для автоматического построения схемы данных.

Изучите выходные данные SchemaGen


Эта схема автоматически выводится из выходных данных StatisticsGen. Мы будем использовать эту сгенерированную схему в этом руководстве, но вы также можете изменять и настраивать схему .

Изучите вывод из ExampleValidator


Если были обнаружены какие-либо аномалии, вы можете проверить свои данные, чтобы все примеры соответствовали вашим предположениям. Выходные данные других компонентов, таких как StatistcsGen, могут быть полезны. Найденные аномалии не блокируют выполнение конвейера.

Вы можете увидеть доступные функции из выходных данных SchemaGen . Если ваши функции можно использовать для построения модели ML напрямую в Trainer , вы можете пропустить следующий шаг и перейти к шагу 4. В противном случае вы можете выполнить некоторую работу по разработке функций на следующем шаге. Компонент Transform необходим, когда требуются операции полного прохода, такие как вычисление средних значений, особенно когда вам нужно масштабировать.

Шаг 3. (Необязательно) Создание признаков с помощью компонента Transform.

На этом этапе вы определите различные задания по разработке функций, которые будут использоваться компонентом Transform в конвейере. Дополнительные сведения см. в руководстве по компоненту Transform .

Это необходимо только в том случае, если для обучающего кода требуются дополнительные функции, которые недоступны в выходных данных ExampleGen. В противном случае не стесняйтесь быстро перейти к следующему шагу использования Trainer.

Определить особенности модели

models/features.py содержит константы для определения функций модели, включая имена функций, размер словаря и так далее. По умолчанию шаблон penguin имеет две константы, FEATURE_KEYS и LABEL_KEY , потому что наша модель penguin решает проблему классификации с использованием обучения с учителем, а все функции являются непрерывными числовыми функциями. См. определения функций из примера чикагского такси в качестве другого примера.

Реализовать препроцессинг для обучения/обслуживания в preprocessing_fn().

Фактическая разработка функций происходит в функции preprocessing_fn() в models/preprocessing.py .

В preprocessing_fn вы можете определить ряд функций, которые манипулируют входным словарем тензоров для создания выходного словаря тензоров. В compute_and_apply_vocabulary Transform API есть вспомогательные функции, такие как scale_to_0_1 и calculate_and_apply_vocabulary, или вы можете просто использовать обычные функции TensorFlow. По умолчанию шаблон penguin включает примеры использования функции tft.scale_to_z_score для нормализации значений функций.

См. Руководство по преобразованию Tensflow для получения дополнительной информации о создании preprocessing_fn .

Добавьте компонент Transform в конвейер.

Если ваш preprocessing_fn готов, добавьте компонент Transform в конвейер.

  1. В файле pipeline/pipeline.py раскомментируйте # components.append(transform) , чтобы добавить компонент в конвейер.

Вы можете обновить конвейер и запустить его снова.

!tfx pipeline update --engine=local --pipeline_path=local_runner.py \
 && tfx run create --engine=local --pipeline_name={PIPELINE_NAME}
Updating pipeline
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
Pipeline "my_pipeline" updated successfully.
Creating a run for pipeline: my_pipeline
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Using deployment config:
 executor_specs {
  key: "CsvExampleGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor"
executor_specs {
  key: "SchemaGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.schema_gen.executor.Executor"
executor_specs {
  key: "StatisticsGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.statistics_gen.executor.Executor"
custom_driver_specs {
  key: "CsvExampleGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.example_gen.driver.FileBasedDriver"
metadata_connection_config {
  database_connection_config {
    sqlite {
      filename_uri: "./tfx_metadata/my_pipeline/metadata.db"
      connection_mode: READWRITE_OPENCREATE

INFO:absl:Using connection config:
 sqlite {
  filename_uri: "./tfx_metadata/my_pipeline/metadata.db"
  connection_mode: READWRITE_OPENCREATE

INFO:absl:Component CsvExampleGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
  id: "CsvExampleGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:09:37.055994"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.CsvExampleGen"
outputs {
  outputs {
    key: "examples"
    value {
      artifact_spec {
        type {
          name: "Examples"
          properties {
            key: "span"
            value: INT
          properties {
            key: "split_names"
            value: STRING
          properties {
            key: "version"
            value: INT
          base_type: DATASET
parameters {
  parameters {
    key: "input_base"
    value {
      field_value {
        string_value: "/home/kbuilder/imported/my_pipeline/data"
  parameters {
    key: "input_config"
    value {
      field_value {
        string_value: "{\n  \"splits\": [\n    {\n      \"name\": \"single_split\",\n      \"pattern\": \"*\"\n    }\n  ]\n}"
  parameters {
    key: "output_config"
    value {
      field_value {
        string_value: "{\n  \"split_config\": {\n    \"splits\": [\n      {\n        \"hash_buckets\": 2,\n        \"name\": \"train\"\n      },\n      {\n        \"hash_buckets\": 1,\n        \"name\": \"eval\"\n      }\n    ]\n  }\n}"
  parameters {
    key: "output_data_format"
    value {
      field_value {
        int_value: 6
  parameters {
    key: "output_file_format"
    value {
      field_value {
        int_value: 5
downstream_nodes: "StatisticsGen"
execution_options {
  caching_options {

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:select span and version = (0, None)
INFO:absl:latest span and version = (0, None)
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 4
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=4, input_dict={}, output_dict=defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/4"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:37.055994:CsvExampleGen:examples:0"
custom_properties {
  key: "span"
  value {
    int_value: 0
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
properties {
  key: "version"
  value: INT
base_type: DATASET
)]}), exec_properties={'output_file_format': 5, 'input_base': '/home/kbuilder/imported/my_pipeline/data', 'output_config': '{\n  "split_config": {\n    "splits": [\n      {\n        "hash_buckets": 2,\n        "name": "train"\n      },\n      {\n        "hash_buckets": 1,\n        "name": "eval"\n      }\n    ]\n  }\n}', 'output_data_format': 6, 'input_config': '{\n  "splits": [\n    {\n      "name": "single_split",\n      "pattern": "*"\n    }\n  ]\n}', 'span': 0, 'version': None, 'input_fingerprint': 'split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/4/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/stateful_working_dir/2022-02-03T11:09:37.055994', tmp_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/4/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
  id: "CsvExampleGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:09:37.055994"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.CsvExampleGen"
outputs {
  outputs {
    key: "examples"
    value {
      artifact_spec {
        type {
          name: "Examples"
          properties {
            key: "span"
            value: INT
          properties {
            key: "split_names"
            value: STRING
          properties {
            key: "version"
            value: INT
          base_type: DATASET
parameters {
  parameters {
    key: "input_base"
    value {
      field_value {
        string_value: "/home/kbuilder/imported/my_pipeline/data"
  parameters {
    key: "input_config"
    value {
      field_value {
        string_value: "{\n  \"splits\": [\n    {\n      \"name\": \"single_split\",\n      \"pattern\": \"*\"\n    }\n  ]\n}"
  parameters {
    key: "output_config"
    value {
      field_value {
        string_value: "{\n  \"split_config\": {\n    \"splits\": [\n      {\n        \"hash_buckets\": 2,\n        \"name\": \"train\"\n      },\n      {\n        \"hash_buckets\": 1,\n        \"name\": \"eval\"\n      }\n    ]\n  }\n}"
  parameters {
    key: "output_data_format"
    value {
      field_value {
        int_value: 6
  parameters {
    key: "output_file_format"
    value {
      field_value {
        int_value: 5
downstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2022-02-03T11:09:37.055994')
INFO:absl:Generating examples.
INFO:absl:Processing input csv data /home/kbuilder/imported/my_pipeline/data/* to TFExample.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
E0203 11:09:37.596944686    5287 fork_posix.cc:70]           Fork support is only compatible with the epoll1 and poll polling strategies
WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be.
INFO:absl:Examples generated.
INFO:absl:Value type <class 'NoneType'> of key version in exec_properties is not supported, going to drop it
INFO:absl:Value type <class 'list'> of key _beam_pipeline_args in exec_properties is not supported, going to drop it
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 4 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/4"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:37.055994:CsvExampleGen:examples:0"
custom_properties {
  key: "span"
  value {
    int_value: 0
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
properties {
  key: "version"
  value: INT
base_type: DATASET
)]}) for execution 4
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component CsvExampleGen is finished.
INFO:absl:Component StatisticsGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.statistics_gen.component.StatisticsGen"
    base_type: PROCESS
  id: "StatisticsGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:09:37.055994"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.StatisticsGen"
inputs {
  inputs {
    key: "examples"
    value {
      channels {
        producer_node_query {
          id: "CsvExampleGen"
        context_queries {
          type {
            name: "pipeline"
          name {
            field_value {
              string_value: "my_pipeline"
        context_queries {
          type {
            name: "pipeline_run"
          name {
            field_value {
              string_value: "2022-02-03T11:09:37.055994"
        context_queries {
          type {
            name: "node"
          name {
            field_value {
              string_value: "my_pipeline.CsvExampleGen"
        artifact_query {
          type {
            name: "Examples"
            base_type: DATASET
        output_key: "examples"
      min_count: 1
outputs {
  outputs {
    key: "statistics"
    value {
      artifact_spec {
        type {
          name: "ExampleStatistics"
          properties {
            key: "span"
            value: INT
          properties {
            key: "split_names"
            value: STRING
          base_type: STATISTICS
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
upstream_nodes: "CsvExampleGen"
downstream_nodes: "SchemaGen"
execution_options {
  caching_options {

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 5
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=5, input_dict={'examples': [Artifact(artifact: id: 4
type_id: 15
uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/4"
properties {
  key: "split_names"
  value {
    string_value: "[\"train\", \"eval\"]"
custom_properties {
  key: "file_format"
  value {
    string_value: "tfrecords_gzip"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:37.055994:CsvExampleGen:examples:0"
custom_properties {
  key: "payload_format"
  value {
    string_value: "FORMAT_TF_EXAMPLE"
custom_properties {
  key: "span"
  value {
    int_value: 0
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
state: LIVE
create_time_since_epoch: 1643886578210
last_update_time_since_epoch: 1643886578210
, artifact_type: id: 15
name: "Examples"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
properties {
  key: "version"
  value: INT
base_type: DATASET
)]}, output_dict=defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/5"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:37.055994:StatisticsGen:statistics:0"
, artifact_type: name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
base_type: STATISTICS
)]}), exec_properties={'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/5/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/stateful_working_dir/2022-02-03T11:09:37.055994', tmp_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/5/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.statistics_gen.component.StatisticsGen"
    base_type: PROCESS
  id: "StatisticsGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:09:37.055994"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.StatisticsGen"
inputs {
  inputs {
    key: "examples"
    value {
      channels {
        producer_node_query {
          id: "CsvExampleGen"
        context_queries {
          type {
            name: "pipeline"
          name {
            field_value {
              string_value: "my_pipeline"
        context_queries {
          type {
            name: "pipeline_run"
          name {
            field_value {
              string_value: "2022-02-03T11:09:37.055994"
        context_queries {
          type {
            name: "node"
          name {
            field_value {
              string_value: "my_pipeline.CsvExampleGen"
        artifact_query {
          type {
            name: "Examples"
            base_type: DATASET
        output_key: "examples"
      min_count: 1
outputs {
  outputs {
    key: "statistics"
    value {
      artifact_spec {
        type {
          name: "ExampleStatistics"
          properties {
            key: "span"
            value: INT
          properties {
            key: "split_names"
            value: STRING
          base_type: STATISTICS
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
upstream_nodes: "CsvExampleGen"
downstream_nodes: "SchemaGen"
execution_options {
  caching_options {
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2022-02-03T11:09:37.055994')
INFO:absl:Generating statistics for split train.
INFO:absl:Statistics for split train written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/5/Split-train.
INFO:absl:Generating statistics for split eval.
INFO:absl:Statistics for split eval written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/5/Split-eval.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 5 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/5"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:37.055994:StatisticsGen:statistics:0"
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
, artifact_type: name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
base_type: STATISTICS
)]}) for execution 5
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component StatisticsGen is finished.
INFO:absl:Component SchemaGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.schema_gen.component.SchemaGen"
    base_type: PROCESS
  id: "SchemaGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:09:37.055994"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.SchemaGen"
inputs {
  inputs {
    key: "statistics"
    value {
      channels {
        producer_node_query {
          id: "StatisticsGen"
        context_queries {
          type {
            name: "pipeline"
          name {
            field_value {
              string_value: "my_pipeline"
        context_queries {
          type {
            name: "pipeline_run"
          name {
            field_value {
              string_value: "2022-02-03T11:09:37.055994"
        context_queries {
          type {
            name: "node"
          name {
            field_value {
              string_value: "my_pipeline.StatisticsGen"
        artifact_query {
          type {
            name: "ExampleStatistics"
            base_type: STATISTICS
        output_key: "statistics"
      min_count: 1
outputs {
  outputs {
    key: "schema"
    value {
      artifact_spec {
        type {
          name: "Schema"
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
  parameters {
    key: "infer_feature_shape"
    value {
      field_value {
        int_value: 1
upstream_nodes: "StatisticsGen"
execution_options {
  caching_options {

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 6
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=6, input_dict={'statistics': [Artifact(artifact: id: 5
type_id: 17
uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/5"
properties {
  key: "split_names"
  value {
    string_value: "[\"train\", \"eval\"]"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:37.055994:StatisticsGen:statistics:0"
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
state: LIVE
create_time_since_epoch: 1643886581527
last_update_time_since_epoch: 1643886581527
, artifact_type: id: 17
name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
base_type: STATISTICS
)]}, output_dict=defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/6"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:37.055994:SchemaGen:schema:0"
, artifact_type: name: "Schema"
)]}), exec_properties={'exclude_splits': '[]', 'infer_feature_shape': 1}, execution_output_uri='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/6/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/stateful_working_dir/2022-02-03T11:09:37.055994', tmp_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/6/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.schema_gen.component.SchemaGen"
    base_type: PROCESS
  id: "SchemaGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:09:37.055994"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.SchemaGen"
inputs {
  inputs {
    key: "statistics"
    value {
      channels {
        producer_node_query {
          id: "StatisticsGen"
        context_queries {
          type {
            name: "pipeline"
          name {
            field_value {
              string_value: "my_pipeline"
        context_queries {
          type {
            name: "pipeline_run"
          name {
            field_value {
              string_value: "2022-02-03T11:09:37.055994"
        context_queries {
          type {
            name: "node"
          name {
            field_value {
              string_value: "my_pipeline.StatisticsGen"
        artifact_query {
          type {
            name: "ExampleStatistics"
            base_type: STATISTICS
        output_key: "statistics"
      min_count: 1
outputs {
  outputs {
    key: "schema"
    value {
      artifact_spec {
        type {
          name: "Schema"
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
  parameters {
    key: "infer_feature_shape"
    value {
      field_value {
        int_value: 1
upstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2022-02-03T11:09:37.055994')
INFO:absl:Processing schema from statistics for split train.
INFO:absl:Processing schema from statistics for split eval.
INFO:absl:Schema written to ./tfx_pipeline_output/my_pipeline/SchemaGen/schema/6/schema.pbtxt.
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 6 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/6"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:37.055994:SchemaGen:schema:0"
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
, artifact_type: name: "Schema"
)]}) for execution 6
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component SchemaGen is finished.

Если конвейер запущен успешно, вы должны увидеть сообщение «Преобразование компонента завершено». где- то в журнале. Поскольку компонент Transform и компонент ExampleValidator не зависят друг от друга, порядок выполнения не является фиксированным. Тем не менее, любой из Transform и ExampleValidator может быть последним компонентом в выполнении конвейера.

Изучите выходные данные Transform

Компонент Transform создает два вида выходных данных: график Tensorflow и преобразованные примеры. Преобразованные примеры относятся к типу артефакта Examples, который также создается ExampleGen, но вместо этого содержит преобразованные значения функций.

Вы можете изучить их, как мы это делали на предыдущем шаге.

with metadata.Metadata(metadata_connection_config) as metadata_handler:
    # Search all aritfacts from the previous run of Transform component.
    artifacts = get_latest_artifacts(metadata_handler.store,
                                     PIPELINE_NAME, "Transform")
    # Find artifacts of Examples type.
    transformed_examples_artifacts = find_latest_artifacts_by_type(
        metadata_handler.store, artifacts,

Шаг 4. Обучите модель с помощью компонента Trainer.

Мы построим модель ML, используя компонент Trainer . Дополнительную информацию см. в руководстве по компоненту Trainer . Вам необходимо предоставить код модели для компонента Trainer.

Определите свою модель.

В шаблоне пингвина models.model.run_fn используется как аргумент run_fn для компонента Trainer . Это означает, что run_fn() в models/model.py будет вызываться при запуске компонента Trainer . Вы можете увидеть код для построения простой модели DNN с использованием keras API в данном коде. См. TensorFlow 2.x в руководстве по TFX для получения дополнительной информации об использовании keras API в TFX.

В этом run_fn вы должны построить модель и сохранить ее в каталог, указанный fn_args.serving_model_dir , который указан компонентом. Вы можете использовать другие аргументы в fn_args , которые передаются в run_fn . Полный список аргументов в fn_args . в соответствующих кодах.

Определите свои функции в models/features.py и используйте их по мере необходимости. Если вы преобразовали свои объекты на шаге 3, вы должны использовать преобразованные объекты в качестве входных данных для своей модели.

Добавьте компонент Trainer в конвейер.

Если ваш run_fn готов, добавьте компонент Trainer в конвейер.

  1. В файле pipeline/pipeline.py раскомментируйте # components.append(trainer) , чтобы добавить компонент в конвейер.

Аргументы для компонента тренера могут зависеть от того, используете ли вы компонент Transform или нет.

  • Если вы НЕ используете компонент Transform , вам не нужно изменять аргументы.
  • Если вы используете компонент Transform , вам необходимо изменить аргументы при создании экземпляра компонента Trainer .

    • Измените аргумент examples на examples=transform.outputs['transformed_examples'], . Нам нужно использовать преобразованные примеры для обучения.
    • Добавьте аргумент transform_graph , например, transform_graph=transform.outputs['transform_graph'], . Этот граф содержит граф TensorFlow для операций преобразования.
    • После вышеуказанных изменений код для создания компонента Trainer будет выглядеть следующим образом.
    # If you use a Transform component.
    trainer = Trainer(

Вы можете обновить конвейер и запустить его снова.

!tfx pipeline update --engine=local --pipeline_path=local_runner.py \
 && tfx run create --engine=local --pipeline_name={PIPELINE_NAME}
Updating pipeline
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
Pipeline "my_pipeline" updated successfully.
Creating a run for pipeline: my_pipeline
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Using deployment config:
 executor_specs {
  key: "CsvExampleGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor"
executor_specs {
  key: "SchemaGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.schema_gen.executor.Executor"
executor_specs {
  key: "StatisticsGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.statistics_gen.executor.Executor"
custom_driver_specs {
  key: "CsvExampleGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.example_gen.driver.FileBasedDriver"
metadata_connection_config {
  database_connection_config {
    sqlite {
      filename_uri: "./tfx_metadata/my_pipeline/metadata.db"
      connection_mode: READWRITE_OPENCREATE

INFO:absl:Using connection config:
 sqlite {
  filename_uri: "./tfx_metadata/my_pipeline/metadata.db"
  connection_mode: READWRITE_OPENCREATE

INFO:absl:Component CsvExampleGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
  id: "CsvExampleGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:10:00.469382"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.CsvExampleGen"
outputs {
  outputs {
    key: "examples"
    value {
      artifact_spec {
        type {
          name: "Examples"
          properties {
            key: "span"
            value: INT
          properties {
            key: "split_names"
            value: STRING
          properties {
            key: "version"
            value: INT
          base_type: DATASET
parameters {
  parameters {
    key: "input_base"
    value {
      field_value {
        string_value: "/home/kbuilder/imported/my_pipeline/data"
  parameters {
    key: "input_config"
    value {
      field_value {
        string_value: "{\n  \"splits\": [\n    {\n      \"name\": \"single_split\",\n      \"pattern\": \"*\"\n    }\n  ]\n}"
  parameters {
    key: "output_config"
    value {
      field_value {
        string_value: "{\n  \"split_config\": {\n    \"splits\": [\n      {\n        \"hash_buckets\": 2,\n        \"name\": \"train\"\n      },\n      {\n        \"hash_buckets\": 1,\n        \"name\": \"eval\"\n      }\n    ]\n  }\n}"
  parameters {
    key: "output_data_format"
    value {
      field_value {
        int_value: 6
  parameters {
    key: "output_file_format"
    value {
      field_value {
        int_value: 5
downstream_nodes: "StatisticsGen"
execution_options {
  caching_options {

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:select span and version = (0, None)
INFO:absl:latest span and version = (0, None)
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 7
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=7, input_dict={}, output_dict=defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/7"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:00.469382:CsvExampleGen:examples:0"
custom_properties {
  key: "span"
  value {
    int_value: 0
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
properties {
  key: "version"
  value: INT
base_type: DATASET
)]}), exec_properties={'input_base': '/home/kbuilder/imported/my_pipeline/data', 'output_config': '{\n  "split_config": {\n    "splits": [\n      {\n        "hash_buckets": 2,\n        "name": "train"\n      },\n      {\n        "hash_buckets": 1,\n        "name": "eval"\n      }\n    ]\n  }\n}', 'output_data_format': 6, 'output_file_format': 5, 'input_config': '{\n  "splits": [\n    {\n      "name": "single_split",\n      "pattern": "*"\n    }\n  ]\n}', 'span': 0, 'version': None, 'input_fingerprint': 'split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/7/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/stateful_working_dir/2022-02-03T11:10:00.469382', tmp_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/7/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
  id: "CsvExampleGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:10:00.469382"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.CsvExampleGen"
outputs {
  outputs {
    key: "examples"
    value {
      artifact_spec {
        type {
          name: "Examples"
          properties {
            key: "span"
            value: INT
          properties {
            key: "split_names"
            value: STRING
          properties {
            key: "version"
            value: INT
          base_type: DATASET
parameters {
  parameters {
    key: "input_base"
    value {
      field_value {
        string_value: "/home/kbuilder/imported/my_pipeline/data"
  parameters {
    key: "input_config"
    value {
      field_value {
        string_value: "{\n  \"splits\": [\n    {\n      \"name\": \"single_split\",\n      \"pattern\": \"*\"\n    }\n  ]\n}"
  parameters {
    key: "output_config"
    value {
      field_value {
        string_value: "{\n  \"split_config\": {\n    \"splits\": [\n      {\n        \"hash_buckets\": 2,\n        \"name\": \"train\"\n      },\n      {\n        \"hash_buckets\": 1,\n        \"name\": \"eval\"\n      }\n    ]\n  }\n}"
  parameters {
    key: "output_data_format"
    value {
      field_value {
        int_value: 6
  parameters {
    key: "output_file_format"
    value {
      field_value {
        int_value: 5
downstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2022-02-03T11:10:00.469382')
INFO:absl:Generating examples.
INFO:absl:Processing input csv data /home/kbuilder/imported/my_pipeline/data/* to TFExample.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
E0203 11:10:01.173700221    5436 fork_posix.cc:70]           Fork support is only compatible with the epoll1 and poll polling strategies
WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be.
INFO:absl:Examples generated.
INFO:absl:Value type <class 'NoneType'> of key version in exec_properties is not supported, going to drop it
INFO:absl:Value type <class 'list'> of key _beam_pipeline_args in exec_properties is not supported, going to drop it
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 7 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/7"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:00.469382:CsvExampleGen:examples:0"
custom_properties {
  key: "span"
  value {
    int_value: 0
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
properties {
  key: "version"
  value: INT
base_type: DATASET
)]}) for execution 7
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component CsvExampleGen is finished.
INFO:absl:Component StatisticsGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.statistics_gen.component.StatisticsGen"
    base_type: PROCESS
  id: "StatisticsGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:10:00.469382"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.StatisticsGen"
inputs {
  inputs {
    key: "examples"
    value {
      channels {
        producer_node_query {
          id: "CsvExampleGen"
        context_queries {
          type {
            name: "pipeline"
          name {
            field_value {
              string_value: "my_pipeline"
        context_queries {
          type {
            name: "pipeline_run"
          name {
            field_value {
              string_value: "2022-02-03T11:10:00.469382"
        context_queries {
          type {
            name: "node"
          name {
            field_value {
              string_value: "my_pipeline.CsvExampleGen"
        artifact_query {
          type {
            name: "Examples"
            base_type: DATASET
        output_key: "examples"
      min_count: 1
Когда это выполнение выполняется успешно, вы создали и запустили свой первый конвейер TFX для своей модели. Поздравляем!

Ваша новая модель будет расположена в каком-то месте в выходном каталоге, но было бы лучше иметь модель в фиксированном месте или службе за пределами конвейера TFX, которая содержит много промежуточных результатов. Еще лучше с непрерывной оценкой построенной модели, которая имеет решающее значение в производственных системах машинного обучения. На следующем шаге мы увидим, как непрерывная оценка и развертывание работают в TFX.

Шаг 5. (Необязательно) Оцените модель с помощью Evaluator и опубликуйте с помощью pusher.

Компонент Evaluator постоянно оценивает каждую построенную модель из Trainer , а Pusher копирует модель в предопределенное место в файловой системе или даже в Google Cloud AI Platform Models .

Добавляет компонент Evaluator в конвейер.

В файле pipeline/pipeline.py :

  1. Раскомментируйте # components.append(model_resolver) , чтобы добавить последний преобразователь модели в конвейер. Evaluator можно использовать для сравнения модели со старой базовой моделью, которая прошла Evaluator при последнем запуске конвейера. LatestBlessedModelResolver находит последнюю модель, прошедшую проверку.
  2. Установите правильный tfma.MetricsSpec для вашей модели. Оценка может отличаться для каждой модели машинного обучения. В шаблоне пингвина использовалась SparseCategoricalAccuracy , потому что мы решаем проблему классификации нескольких категорий. Вам также необходимо указать tfma.SliceSpec для анализа вашей модели на наличие определенных срезов. Дополнительные сведения см. в руководстве по компоненту Evaluator .
  3. Раскомментируйте # components.append(evaluator) , чтобы добавить компонент в конвейер.

Вы сможете найти свою новую модель в SERVING_MODEL_DIR .

Если вы заинтересованы в запуске своего конвейера на Kubeflow Pipelines, найдите дополнительные инструкции в учебнике TFX on Cloud AI Platform Pipelines .


Чтобы очистить все ресурсы Google Cloud, используемые на этом этапе, вы можете удалить проект Google Cloud, который вы использовали для руководства.

Кроме того, вы можете очистить отдельные ресурсы, посетив каждую консоль: