Создайте конвейер TFX для ваших данных с помощью шаблона Penguin


Введение

Этот документ содержит инструкции по созданию конвейера TensorFlow Extended (TFX) для вашего собственного набора данных с использованием шаблона Penguin, который предоставляется с пакетом TFX Python. Созданный конвейер изначально будет использовать набор данных Palmer Penguins , но мы преобразуем конвейер для вашего набора данных.

Предпосылки

  • Линукс / МакОС
  • Питон 3.6-3.8
  • блокнот Юпитер

Шаг 1. Скопируйте предопределенный шаблон в каталог вашего проекта.

На этом этапе мы создадим рабочий каталог и файлы проекта конвейера, скопировав файлы из шаблона пингвина в TFX. Вы можете думать об этом как о каркасе для вашего проекта конвейера TFX.

Обновить пункт

Если мы работаем в Colab, мы должны убедиться, что у нас установлена ​​последняя версия Pip. Локальные системы, конечно, могут быть обновлены отдельно.

import sys
if 'google.colab' in sys.modules:
  !pip install --upgrade pip

Установить необходимый пакет

Сначала установите TFX и анализ моделей TensorFlow (TFMA).

pip install -U tfx tensorflow-model-analysis

Давайте проверим версии TFX.

import tensorflow as tf
import tensorflow_model_analysis as tfma
import tfx

print('TF version: {}'.format(tf.__version__))
print('TFMA version: {}'.format(tfma.__version__))
print('TFX version: {}'.format(tfx.__version__))
TF version: 2.7.1
TFMA version: 0.37.0
TFX version: 1.6.0

Мы готовы создать конвейер.

Установите PROJECT_DIR в соответствующее место назначения для вашей среды. Значение по умолчанию — ~/imported/${PIPELINE_NAME} , которое подходит для среды ноутбуков Google Cloud AI Platform .

Вы можете дать своему конвейеру другое имя, изменив PIPELINE_NAME ниже. Это также станет именем каталога проекта, в который будут помещены ваши файлы.

PIPELINE_NAME="my_pipeline"
import os
# Set this project directory to your new tfx pipeline project.
PROJECT_DIR=os.path.join(os.path.expanduser("~"), "imported", PIPELINE_NAME)

Скопируйте файлы шаблонов.

TFX включает шаблон penguin с пакетом python TFX. шаблон penguin содержит множество инструкций по переносу вашего набора данных в конвейер, что и является целью этого руководства.

Команда интерфейса командной строки tfx template copy копирует предварительно определенные файлы шаблонов в каталог вашего проекта.

# Set `PATH` to include user python binary directory and a directory containing `skaffold`.
PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin

!tfx template copy \
  --pipeline-name={PIPELINE_NAME} \
  --destination-path={PROJECT_DIR} \
  --model=penguin
env: PATH=/tmpfs/src/tf_docs_env/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/snap/bin:/opt/puppetlabs/bin:/opt/android-studio/current/bin:/usr/local/go/bin:/usr/local/go/packages/bin:/opt/kubernetes/client/bin/:/home/kbuilder/.local/bin:/home/jupyter/.local/bin
CLI
Copying penguin pipeline template
kubeflow_runner.py -> /home/kbuilder/imported/my_pipeline/kubeflow_runner.py
configs.py -> /home/kbuilder/imported/my_pipeline/pipeline/configs.py
pipeline.py -> /home/kbuilder/imported/my_pipeline/pipeline/pipeline.py
__init__.py -> /home/kbuilder/imported/my_pipeline/pipeline/__init__.py
model.py -> /home/kbuilder/imported/my_pipeline/models/model.py
features.py -> /home/kbuilder/imported/my_pipeline/models/features.py
features_test.py -> /home/kbuilder/imported/my_pipeline/models/features_test.py
preprocessing_test.py -> /home/kbuilder/imported/my_pipeline/models/preprocessing_test.py
preprocessing.py -> /home/kbuilder/imported/my_pipeline/models/preprocessing.py
model_test.py -> /home/kbuilder/imported/my_pipeline/models/model_test.py
__init__.py -> /home/kbuilder/imported/my_pipeline/models/__init__.py
constants.py -> /home/kbuilder/imported/my_pipeline/models/constants.py
local_runner.py -> /home/kbuilder/imported/my_pipeline/local_runner.py
__init__.py -> /home/kbuilder/imported/my_pipeline/__init__.py

Измените контекст рабочего каталога в этой записной книжке на каталог проекта.

%cd {PROJECT_DIR}
/home/kbuilder/imported/my_pipeline

Просмотрите скопированные исходные файлы

Шаблон TFX предоставляет основные файлы шаблонов для построения конвейера, включая исходный код Python и образцы данных. Шаблон penguin использует тот же набор данных Palmer Penguins и модель машинного обучения, что и пример Penguin .

Вот краткое введение в каждый из файлов Python.

  • pipeline — этот каталог содержит определение конвейера
    • configs.py — определяет общие константы для запуска конвейера.
    • pipeline.py — определяет компоненты TFX и конвейер
  • models — этот каталог содержит определения моделей машинного обучения.
    • features.py , features_test.py — определяет функции для модели
    • preprocessing.py , preprocessing_test.py — определяет процедуры предварительной обработки данных.
    • constants.py — определяет константы модели
    • model.py , model_test.py — определяет модель машинного обучения с использованием фреймворков машинного обучения, таких как TensorFlow.
  • local_runner.py — определяет бегун для локальной среды, который использует локальный механизм оркестровки.
  • kubeflow_runner.py — определение исполнителя для механизма оркестрации Kubeflow Pipelines.

По умолчанию шаблон включает только стандартные компоненты TFX. Если вам нужны какие-то настраиваемые действия, вы можете создать настраиваемые компоненты для своего конвейера. Подробности см. в руководстве по пользовательским компонентам TFX .

Файлы юнит-тестов.

Вы могли заметить, что в имени некоторых файлов _test.py . Это модульные тесты конвейера, и рекомендуется добавлять дополнительные модульные тесты по мере реализации собственных конвейеров. Вы можете запускать модульные тесты, указав имя модуля тестовых файлов с флагом -m . Обычно вы можете получить имя модуля, удалив расширение .py и заменив / на . . Например:

import sys
!{sys.executable} -m models.features_test
Running tests under Python 3.7.5: /tmpfs/src/tf_docs_env/bin/python
[ RUN      ] FeaturesTest.testLabelKey
INFO:tensorflow:time(__main__.FeaturesTest.testLabelKey): 0.0s
I0203 11:08:46.306882 140258321348416 test_util.py:2309] time(__main__.FeaturesTest.testLabelKey): 0.0s
[       OK ] FeaturesTest.testLabelKey
[ RUN      ] FeaturesTest.test_session
[  SKIPPED ] FeaturesTest.test_session
----------------------------------------------------------------------
Ran 2 tests in 0.001s

OK (skipped=1)

Создайте конвейер TFX в локальной среде.

TFX поддерживает несколько механизмов оркестровки для запуска конвейеров. Мы будем использовать локальный механизм оркестровки. Механизм локальной оркестровки работает без каких-либо дополнительных зависимостей и подходит для разработки и отладки, поскольку работает в локальной среде, а не зависит от удаленных вычислительных кластеров.

Мы будем использовать local_runner.py для запуска вашего конвейера с помощью локального оркестратора. Вы должны создать конвейер перед его запуском. Вы можете создать конвейер с помощью команды pipeline create .

tfx pipeline create --engine=local --pipeline_path=local_runner.py
CLI
Creating pipeline
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
Pipeline "my_pipeline" created successfully.

Команда pipeline create регистрирует ваш конвейер, определенный в local_runner.py , фактически не запуская его.

Вы запустите созданный конвейер с помощью команды run create в следующих шагах.

Шаг 2. Загрузите ВАШИ данные в конвейер.

Начальный конвейер принимает набор данных о пингвинах, включенный в шаблон. Вам нужно поместить свои данные в конвейер, и большинство конвейеров TFX начинаются с компонента ExampleGen.

Выберите ExampleGen

Ваши данные могут храниться в любом месте, к которому может получить доступ ваш конвейер, либо в локальной, либо в распределенной файловой системе, либо в системе с поддержкой запросов. TFX предоставляет различные компоненты ExampleGen для переноса ваших данных в конвейер TFX. Вы можете выбрать один из следующих примеров создания компонентов.

Вы также можете создать свой собственный ExampleGen, например, tfx включает собственный ExecampleGen, который использует Presto в качестве источника данных. См . руководство для получения дополнительной информации о том, как использовать и разрабатывать собственные исполнители.

Как только вы решите, какой ExampleGen использовать, вам нужно будет изменить определение конвейера, чтобы использовать ваши данные.

  1. Измените DATA_PATH в local_runner.py и задайте для него расположение ваших файлов.

    • Если у вас есть файлы в локальной среде, укажите путь. Это лучший вариант для разработки или отладки конвейера.
    • Если файлы хранятся в GCS, вы можете использовать путь, начинающийся с gs://{bucket_name}/... . Убедитесь, что вы можете получить доступ к GCS со своего терминала, например, с помощью gsutil . При необходимости следуйте инструкциям по авторизации в Google Cloud .
    • Если вы хотите использовать ExampleGen на основе запроса, например BigQueryExampleGen, вам потребуется оператор Query для выбора данных из источника данных. Есть еще несколько вещей, которые вам нужно настроить, чтобы использовать Google Cloud BigQuery в качестве источника данных.
    • В pipeline/configs.py :
      • Измените GOOGLE_CLOUD_PROJECT и GCS_BUCKET_NAME на свой проект GCP и имя корзины. Ведро должно существовать до того, как мы запустим конвейер.
      • BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS переменную BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS.
      • Раскомментируйте и установите переменную BIG_QUERY_QUERY в оператор запроса .
    • В local_runner.py :
      • Вместо этого закомментируйте аргумент data_path и раскомментируйте аргумент query в pipe.create_pipeline pipeline.create_pipeline() .
    • В pipeline/pipeline.py :
      • Закомментируйте аргумент data_path и раскомментируйте аргумент query в create_pipeline() .
      • Используйте BigQueryExampleGen вместо CsvExampleGen.
  2. Замените существующий CsvExampleGen своим классом ExampleGen в pipeline/pipeline.py . Каждый класс ExampleGen имеет разную подпись. Пожалуйста, смотрите руководство по компоненту ExampleGen для более подробной информации. Не забудьте импортировать необходимые модули с операторами import в pipeline/pipeline.py .

Начальный конвейер состоит из четырех компонентов: ExampleGen , StatisticsGen , SchemaGen и ExampleValidator . Нам не нужно ничего менять для StatisticsGen , SchemaGen и ExampleValidator . Давайте запустим конвейер в первый раз.

# Update and run the pipeline.
!tfx pipeline update --engine=local --pipeline_path=local_runner.py \
 && tfx run create --engine=local --pipeline_name={PIPELINE_NAME}
CLI
Updating pipeline
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
Pipeline "my_pipeline" updated successfully.
CLI
Creating a run for pipeline: my_pipeline
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Using deployment config:
 executor_specs {
  key: "CsvExampleGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor"
      }
    }
  }
}
executor_specs {
  key: "SchemaGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.schema_gen.executor.Executor"
    }
  }
}
executor_specs {
  key: "StatisticsGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.statistics_gen.executor.Executor"
      }
    }
  }
}
custom_driver_specs {
  key: "CsvExampleGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.example_gen.driver.FileBasedDriver"
    }
  }
}
metadata_connection_config {
  database_connection_config {
    sqlite {
      filename_uri: "./tfx_metadata/my_pipeline/metadata.db"
      connection_mode: READWRITE_OPENCREATE
    }
  }
}

INFO:absl:Using connection config:
 sqlite {
  filename_uri: "./tfx_metadata/my_pipeline/metadata.db"
  connection_mode: READWRITE_OPENCREATE
}

INFO:absl:Component CsvExampleGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
  }
  id: "CsvExampleGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2022-02-03T11:09:12.120566"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.CsvExampleGen"
      }
    }
  }
}
outputs {
  outputs {
    key: "examples"
    value {
      artifact_spec {
        type {
          name: "Examples"
          properties {
            key: "span"
            value: INT
          }
          properties {
            key: "split_names"
            value: STRING
          }
          properties {
            key: "version"
            value: INT
          }
          base_type: DATASET
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "input_base"
    value {
      field_value {
        string_value: "/home/kbuilder/imported/my_pipeline/data"
      }
    }
  }
  parameters {
    key: "input_config"
    value {
      field_value {
        string_value: "{\n  \"splits\": [\n    {\n      \"name\": \"single_split\",\n      \"pattern\": \"*\"\n    }\n  ]\n}"
      }
    }
  }
  parameters {
    key: "output_config"
    value {
      field_value {
        string_value: "{\n  \"split_config\": {\n    \"splits\": [\n      {\n        \"hash_buckets\": 2,\n        \"name\": \"train\"\n      },\n      {\n        \"hash_buckets\": 1,\n        \"name\": \"eval\"\n      }\n    ]\n  }\n}"
      }
    }
  }
  parameters {
    key: "output_data_format"
    value {
      field_value {
        int_value: 6
      }
    }
  }
  parameters {
    key: "output_file_format"
    value {
      field_value {
        int_value: 5
      }
    }
  }
}
downstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
  }
}

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:select span and version = (0, None)
INFO:absl:latest span and version = (0, None)
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 1
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=1, input_dict={}, output_dict=defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/1"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522"
  }
}
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:12.120566:CsvExampleGen:examples:0"
  }
}
custom_properties {
  key: "span"
  value {
    int_value: 0
  }
}
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
properties {
  key: "version"
  value: INT
}
base_type: DATASET
)]}), exec_properties={'input_config': '{\n  "splits": [\n    {\n      "name": "single_split",\n      "pattern": "*"\n    }\n  ]\n}', 'output_data_format': 6, 'output_file_format': 5, 'output_config': '{\n  "split_config": {\n    "splits": [\n      {\n        "hash_buckets": 2,\n        "name": "train"\n      },\n      {\n        "hash_buckets": 1,\n        "name": "eval"\n      }\n    ]\n  }\n}', 'input_base': '/home/kbuilder/imported/my_pipeline/data', 'span': 0, 'version': None, 'input_fingerprint': 'split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/1/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/stateful_working_dir/2022-02-03T11:09:12.120566', tmp_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/1/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
  }
  id: "CsvExampleGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2022-02-03T11:09:12.120566"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.CsvExampleGen"
      }
    }
  }
}
outputs {
  outputs {
    key: "examples"
    value {
      artifact_spec {
        type {
          name: "Examples"
          properties {
            key: "span"
            value: INT
          }
          properties {
            key: "split_names"
            value: STRING
          }
          properties {
            key: "version"
            value: INT
          }
          base_type: DATASET
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "input_base"
    value {
      field_value {
        string_value: "/home/kbuilder/imported/my_pipeline/data"
      }
    }
  }
  parameters {
    key: "input_config"
    value {
      field_value {
        string_value: "{\n  \"splits\": [\n    {\n      \"name\": \"single_split\",\n      \"pattern\": \"*\"\n    }\n  ]\n}"
      }
    }
  }
  parameters {
    key: "output_config"
    value {
      field_value {
        string_value: "{\n  \"split_config\": {\n    \"splits\": [\n      {\n        \"hash_buckets\": 2,\n        \"name\": \"train\"\n      },\n      {\n        \"hash_buckets\": 1,\n        \"name\": \"eval\"\n      }\n    ]\n  }\n}"
      }
    }
  }
  parameters {
    key: "output_data_format"
    value {
      field_value {
        int_value: 6
      }
    }
  }
  parameters {
    key: "output_file_format"
    value {
      field_value {
        int_value: 5
      }
    }
  }
}
downstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
  }
}
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2022-02-03T11:09:12.120566')
INFO:absl:Generating examples.
INFO:absl:Processing input csv data /home/kbuilder/imported/my_pipeline/data/* to TFExample.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
E0203 11:09:12.848598153    5127 fork_posix.cc:70]           Fork support is only compatible with the epoll1 and poll polling strategies
WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be.
INFO:absl:Examples generated.
INFO:absl:Value type <class 'NoneType'> of key version in exec_properties is not supported, going to drop it
INFO:absl:Value type <class 'list'> of key _beam_pipeline_args in exec_properties is not supported, going to drop it
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 1 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/1"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522"
  }
}
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:12.120566:CsvExampleGen:examples:0"
  }
}
custom_properties {
  key: "span"
  value {
    int_value: 0
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
  }
}
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
properties {
  key: "version"
  value: INT
}
base_type: DATASET
)]}) for execution 1
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component CsvExampleGen is finished.
INFO:absl:Component StatisticsGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.statistics_gen.component.StatisticsGen"
    base_type: PROCESS
  }
  id: "StatisticsGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2022-02-03T11:09:12.120566"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.StatisticsGen"
      }
    }
  }
}
inputs {
  inputs {
    key: "examples"
    value {
      channels {
        producer_node_query {
          id: "CsvExampleGen"
        }
        context_queries {
          type {
            name: "pipeline"
          }
          name {
            field_value {
              string_value: "my_pipeline"
            }
          }
        }
        context_queries {
          type {
            name: "pipeline_run"
          }
          name {
            field_value {
              string_value: "2022-02-03T11:09:12.120566"
            }
          }
        }
        context_queries {
          type {
            name: "node"
          }
          name {
            field_value {
              string_value: "my_pipeline.CsvExampleGen"
            }
          }
        }
        artifact_query {
          type {
            name: "Examples"
            base_type: DATASET
          }
        }
        output_key: "examples"
      }
      min_count: 1
    }
  }
}
outputs {
  outputs {
    key: "statistics"
    value {
      artifact_spec {
        type {
          name: "ExampleStatistics"
          properties {
            key: "span"
            value: INT
          }
          properties {
            key: "split_names"
            value: STRING
          }
          base_type: STATISTICS
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
      }
    }
  }
}
upstream_nodes: "CsvExampleGen"
downstream_nodes: "SchemaGen"
execution_options {
  caching_options {
  }
}

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 2
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=2, input_dict={'examples': [Artifact(artifact: id: 1
type_id: 15
uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/1"
properties {
  key: "split_names"
  value {
    string_value: "[\"train\", \"eval\"]"
  }
}
custom_properties {
  key: "file_format"
  value {
    string_value: "tfrecords_gzip"
  }
}
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522"
  }
}
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:12.120566:CsvExampleGen:examples:0"
  }
}
custom_properties {
  key: "payload_format"
  value {
    string_value: "FORMAT_TF_EXAMPLE"
  }
}
custom_properties {
  key: "span"
  value {
    int_value: 0
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
  }
}
state: LIVE
create_time_since_epoch: 1643886553302
last_update_time_since_epoch: 1643886553302
, artifact_type: id: 15
name: "Examples"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
properties {
  key: "version"
  value: INT
}
base_type: DATASET
)]}, output_dict=defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/2"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:12.120566:StatisticsGen:statistics:0"
  }
}
, artifact_type: name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
base_type: STATISTICS
)]}), exec_properties={'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/2/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/stateful_working_dir/2022-02-03T11:09:12.120566', tmp_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/2/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.statistics_gen.component.StatisticsGen"
    base_type: PROCESS
  }
  id: "StatisticsGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2022-02-03T11:09:12.120566"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.StatisticsGen"
      }
    }
  }
}
inputs {
  inputs {
    key: "examples"
    value {
      channels {
        producer_node_query {
          id: "CsvExampleGen"
        }
        context_queries {
          type {
            name: "pipeline"
          }
          name {
            field_value {
              string_value: "my_pipeline"
            }
          }
        }
        context_queries {
          type {
            name: "pipeline_run"
          }
          name {
            field_value {
              string_value: "2022-02-03T11:09:12.120566"
            }
          }
        }
        context_queries {
          type {
            name: "node"
          }
          name {
            field_value {
              string_value: "my_pipeline.CsvExampleGen"
            }
          }
        }
        artifact_query {
          type {
            name: "Examples"
            base_type: DATASET
          }
        }
        output_key: "examples"
      }
      min_count: 1
    }
  }
}
outputs {
  outputs {
    key: "statistics"
    value {
      artifact_spec {
        type {
          name: "ExampleStatistics"
          properties {
            key: "span"
            value: INT
          }
          properties {
            key: "split_names"
            value: STRING
          }
          base_type: STATISTICS
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
      }
    }
  }
}
upstream_nodes: "CsvExampleGen"
downstream_nodes: "SchemaGen"
execution_options {
  caching_options {
  }
}
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2022-02-03T11:09:12.120566')
INFO:absl:Generating statistics for split train.
INFO:absl:Statistics for split train written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/2/Split-train.
INFO:absl:Generating statistics for split eval.
INFO:absl:Statistics for split eval written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/2/Split-eval.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 2 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/2"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:12.120566:StatisticsGen:statistics:0"
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
  }
}
, artifact_type: name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
base_type: STATISTICS
)]}) for execution 2
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component StatisticsGen is finished.
INFO:absl:Component SchemaGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.schema_gen.component.SchemaGen"
    base_type: PROCESS
  }
  id: "SchemaGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2022-02-03T11:09:12.120566"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.SchemaGen"
      }
    }
  }
}
inputs {
  inputs {
    key: "statistics"
    value {
      channels {
        producer_node_query {
          id: "StatisticsGen"
        }
        context_queries {
          type {
            name: "pipeline"
          }
          name {
            field_value {
              string_value: "my_pipeline"
            }
          }
        }
        context_queries {
          type {
            name: "pipeline_run"
          }
          name {
            field_value {
              string_value: "2022-02-03T11:09:12.120566"
            }
          }
        }
        context_queries {
          type {
            name: "node"
          }
          name {
            field_value {
              string_value: "my_pipeline.StatisticsGen"
            }
          }
        }
        artifact_query {
          type {
            name: "ExampleStatistics"
            base_type: STATISTICS
          }
        }
        output_key: "statistics"
      }
      min_count: 1
    }
  }
}
outputs {
  outputs {
    key: "schema"
    value {
      artifact_spec {
        type {
          name: "Schema"
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
      }
    }
  }
  parameters {
    key: "infer_feature_shape"
    value {
      field_value {
        int_value: 1
      }
    }
  }
}
upstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
  }
}

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 3
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=3, input_dict={'statistics': [Artifact(artifact: id: 2
type_id: 17
uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/2"
properties {
  key: "split_names"
  value {
    string_value: "[\"train\", \"eval\"]"
  }
}
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:12.120566:StatisticsGen:statistics:0"
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
  }
}
state: LIVE
create_time_since_epoch: 1643886556588
last_update_time_since_epoch: 1643886556588
, artifact_type: id: 17
name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
base_type: STATISTICS
)]}, output_dict=defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/3"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:12.120566:SchemaGen:schema:0"
  }
}
, artifact_type: name: "Schema"
)]}), exec_properties={'infer_feature_shape': 1, 'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/3/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/stateful_working_dir/2022-02-03T11:09:12.120566', tmp_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/3/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.schema_gen.component.SchemaGen"
    base_type: PROCESS
  }
  id: "SchemaGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2022-02-03T11:09:12.120566"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.SchemaGen"
      }
    }
  }
}
inputs {
  inputs {
    key: "statistics"
    value {
      channels {
        producer_node_query {
          id: "StatisticsGen"
        }
        context_queries {
          type {
            name: "pipeline"
          }
          name {
            field_value {
              string_value: "my_pipeline"
            }
          }
        }
        context_queries {
          type {
            name: "pipeline_run"
          }
          name {
            field_value {
              string_value: "2022-02-03T11:09:12.120566"
            }
          }
        }
        context_queries {
          type {
            name: "node"
          }
          name {
            field_value {
              string_value: "my_pipeline.StatisticsGen"
            }
          }
        }
        artifact_query {
          type {
            name: "ExampleStatistics"
            base_type: STATISTICS
          }
        }
        output_key: "statistics"
      }
      min_count: 1
    }
  }
}
outputs {
  outputs {
    key: "schema"
    value {
      artifact_spec {
        type {
          name: "Schema"
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
      }
    }
  }
  parameters {
    key: "infer_feature_shape"
    value {
      field_value {
        int_value: 1
      }
    }
  }
}
upstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
  }
}
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2022-02-03T11:09:12.120566')
INFO:absl:Processing schema from statistics for split train.
INFO:absl:Processing schema from statistics for split eval.
INFO:absl:Schema written to ./tfx_pipeline_output/my_pipeline/SchemaGen/schema/3/schema.pbtxt.
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 3 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/3"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:12.120566:SchemaGen:schema:0"
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
  }
}
, artifact_type: name: "Schema"
)]}) for execution 3
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component SchemaGen is finished.

Вы должны увидеть «Валидатор примера компонента завершен». если конвейер прошел успешно.

Изучите вывод конвейера.

Конвейер TFX создает два вида выходных данных: артефакты и базу данных метаданных (MLMD) , которая содержит метаданные артефактов и выполнения конвейера. Расположение вывода определяется в local_runner.py . По умолчанию артефакты хранятся в tfx_pipeline_output а метаданные хранятся в виде базы данных sqlite в каталоге tfx_metadata .

Вы можете использовать API-интерфейсы MLMD для проверки этих выходных данных. Во-первых, мы определим некоторые служебные функции для поиска только что созданных выходных артефактов.

import tensorflow as tf
import tfx
from ml_metadata import errors
from ml_metadata.proto import metadata_store_pb2
from tfx.types import artifact_utils

# TODO(b/171447278): Move these functions into TFX library.

def get_latest_executions(store, pipeline_name, component_id = None):
  """Fetch all pipeline runs."""
  if component_id is None:  # Find entire pipeline runs.
    run_contexts = [
        c for c in store.get_contexts_by_type('run')
        if c.properties['pipeline_name'].string_value == pipeline_name
    ]
  else:  # Find specific component runs.
    run_contexts = [
        c for c in store.get_contexts_by_type('component_run')
        if c.properties['pipeline_name'].string_value == pipeline_name and
           c.properties['component_id'].string_value == component_id
    ]
  if not run_contexts:
    return []
  # Pick the latest run context.
  latest_context = max(run_contexts,
                       key=lambda c: c.last_update_time_since_epoch)
  return store.get_executions_by_context(latest_context.id)

def get_latest_artifacts(store, pipeline_name, component_id = None):
  """Fetch all artifacts from latest pipeline execution."""
  executions = get_latest_executions(store, pipeline_name, component_id)

  # Fetch all artifacts produced from the given executions.
  execution_ids = [e.id for e in executions]
  events = store.get_events_by_execution_ids(execution_ids)
  artifact_ids = [
      event.artifact_id for event in events
      if event.type == metadata_store_pb2.Event.OUTPUT
  ]
  return store.get_artifacts_by_id(artifact_ids)

def find_latest_artifacts_by_type(store, artifacts, artifact_type):
  """Get the latest artifacts of a specified type."""
  # Get type information from MLMD
  try:
    artifact_type = store.get_artifact_type(artifact_type)
  except errors.NotFoundError:
    return []
  # Filter artifacts with type.
  filtered_artifacts = [aritfact for aritfact in artifacts
                        if aritfact.type_id == artifact_type.id]
  # Convert MLMD artifact data into TFX Artifact instances.
  return [artifact_utils.deserialize_artifact(artifact_type, artifact)
      for artifact in filtered_artifacts]


from tfx.orchestration.experimental.interactive import visualizations

def visualize_artifacts(artifacts):
  """Visualizes artifacts using standard visualization modules."""
  for artifact in artifacts:
    visualization = visualizations.get_registry().get_visualization(
        artifact.type_name)
    if visualization:
      visualization.display(artifact)

from tfx.orchestration.experimental.interactive import standard_visualizations
standard_visualizations.register_standard_visualizations()

import pprint

from tfx.orchestration import metadata
from tfx.types import artifact_utils
from tfx.types import standard_artifacts

def preview_examples(artifacts):
  """Preview a few records from Examples artifacts."""
  pp = pprint.PrettyPrinter()
  for artifact in artifacts:
    print("==== Examples artifact:{}({})".format(artifact.name, artifact.uri))
    for split in artifact_utils.decode_split_names(artifact.split_names):
      print("==== Reading from split:{}".format(split))
      split_uri = artifact_utils.get_split_uri([artifact], split)

      # Get the list of files in this directory (all compressed TFRecord files)
      tfrecord_filenames = [os.path.join(split_uri, name)
                            for name in os.listdir(split_uri)]
      # Create a `TFRecordDataset` to read these files
      dataset = tf.data.TFRecordDataset(tfrecord_filenames,
                                        compression_type="GZIP")
      # Iterate over the first 2 records and decode them.
      for tfrecord in dataset.take(2):
        serialized_example = tfrecord.numpy()
        example = tf.train.Example()
        example.ParseFromString(serialized_example)
        pp.pprint(example)

import local_runner

metadata_connection_config = metadata.sqlite_metadata_connection_config(
              local_runner.METADATA_PATH)

Теперь мы можем читать метаданные выходных артефактов из MLMD.

with metadata.Metadata(metadata_connection_config) as metadata_handler:
    # Search all aritfacts from the previous pipeline run.
    artifacts = get_latest_artifacts(metadata_handler.store, PIPELINE_NAME)
    # Find artifacts of Examples type.
    examples_artifacts = find_latest_artifacts_by_type(
        metadata_handler.store, artifacts,
        standard_artifacts.Examples.TYPE_NAME)
    # Find artifacts generated from StatisticsGen.
    stats_artifacts = find_latest_artifacts_by_type(
        metadata_handler.store, artifacts,
        standard_artifacts.ExampleStatistics.TYPE_NAME)
    # Find artifacts generated from SchemaGen.
    schema_artifacts = find_latest_artifacts_by_type(
        metadata_handler.store, artifacts,
        standard_artifacts.Schema.TYPE_NAME)
    # Find artifacts generated from ExampleValidator.
    anomalies_artifacts = find_latest_artifacts_by_type(
        metadata_handler.store, artifacts,
        standard_artifacts.ExampleAnomalies.TYPE_NAME)

Теперь мы можем изучить выходные данные каждого компонента. Проверка данных Tensorflow (TFDV) используется в StatisticsGen , SchemaGen и ExampleValidator , а TFDV можно использовать для визуализации выходных данных этих компонентов.

В этом руководстве мы будем использовать вспомогательные методы визуализации в TFX, которые используют TFDV внутри для отображения визуализации. См. руководство по компонентам TFX, чтобы узнать больше о каждом компоненте.

Изучите форму вывода ExampleGen

Давайте рассмотрим вывод из ExampleGen. Взгляните на первые два примера для каждого разделения:

preview_examples(examples_artifacts)

По умолчанию TFX ExampleGen делит примеры на два сплита, train и eval , но вы можете настроить свою конфигурацию сплита .

Изучите вывод из StatisticsGen

visualize_artifacts(stats_artifacts)

Эти статистические данные предоставляются SchemaGen для автоматического построения схемы данных.

Изучите выходные данные SchemaGen

visualize_artifacts(schema_artifacts)

Эта схема автоматически выводится из выходных данных StatisticsGen. Мы будем использовать эту сгенерированную схему в этом руководстве, но вы также можете изменять и настраивать схему .

Изучите вывод из ExampleValidator

visualize_artifacts(anomalies_artifacts)

Если были обнаружены какие-либо аномалии, вы можете проверить свои данные, чтобы все примеры соответствовали вашим предположениям. Выходные данные других компонентов, таких как StatistcsGen, могут быть полезны. Найденные аномалии не блокируют выполнение конвейера.

Вы можете увидеть доступные функции из выходных данных SchemaGen . Если ваши функции можно использовать для построения модели ML напрямую в Trainer , вы можете пропустить следующий шаг и перейти к шагу 4. В противном случае вы можете выполнить некоторую работу по разработке функций на следующем шаге. Компонент Transform необходим, когда требуются операции полного прохода, такие как вычисление средних значений, особенно когда вам нужно масштабировать.

Шаг 3. (Необязательно) Создание признаков с помощью компонента Transform.

На этом этапе вы определите различные задания по разработке функций, которые будут использоваться компонентом Transform в конвейере. Дополнительные сведения см. в руководстве по компоненту Transform .

Это необходимо только в том случае, если для обучающего кода требуются дополнительные функции, которые недоступны в выходных данных ExampleGen. В противном случае не стесняйтесь быстро перейти к следующему шагу использования Trainer.

Определить особенности модели

models/features.py содержит константы для определения функций модели, включая имена функций, размер словаря и так далее. По умолчанию шаблон penguin имеет две константы, FEATURE_KEYS и LABEL_KEY , потому что наша модель penguin решает проблему классификации с использованием обучения с учителем, а все функции являются непрерывными числовыми функциями. См. определения функций из примера чикагского такси в качестве другого примера.

Реализовать препроцессинг для обучения/обслуживания в preprocessing_fn().

Фактическая разработка функций происходит в функции preprocessing_fn() в models/preprocessing.py .

В preprocessing_fn вы можете определить ряд функций, которые манипулируют входным словарем тензоров для создания выходного словаря тензоров. В compute_and_apply_vocabulary Transform API есть вспомогательные функции, такие как scale_to_0_1 и calculate_and_apply_vocabulary, или вы можете просто использовать обычные функции TensorFlow. По умолчанию шаблон penguin включает примеры использования функции tft.scale_to_z_score для нормализации значений функций.

См. Руководство по преобразованию Tensflow для получения дополнительной информации о создании preprocessing_fn .

Добавьте компонент Transform в конвейер.

Если ваш preprocessing_fn готов, добавьте компонент Transform в конвейер.

  1. В файле pipeline/pipeline.py раскомментируйте # components.append(transform) , чтобы добавить компонент в конвейер.

Вы можете обновить конвейер и запустить его снова.

!tfx pipeline update --engine=local --pipeline_path=local_runner.py \
 && tfx run create --engine=local --pipeline_name={PIPELINE_NAME}
CLI
Updating pipeline
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
Pipeline "my_pipeline" updated successfully.
CLI
Creating a run for pipeline: my_pipeline
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Using deployment config:
 executor_specs {
  key: "CsvExampleGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor"
      }
    }
  }
}
executor_specs {
  key: "SchemaGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.schema_gen.executor.Executor"
    }
  }
}
executor_specs {
  key: "StatisticsGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.statistics_gen.executor.Executor"
      }
    }
  }
}
custom_driver_specs {
  key: "CsvExampleGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.example_gen.driver.FileBasedDriver"
    }
  }
}
metadata_connection_config {
  database_connection_config {
    sqlite {
      filename_uri: "./tfx_metadata/my_pipeline/metadata.db"
      connection_mode: READWRITE_OPENCREATE
    }
  }
}

INFO:absl:Using connection config:
 sqlite {
  filename_uri: "./tfx_metadata/my_pipeline/metadata.db"
  connection_mode: READWRITE_OPENCREATE
}

INFO:absl:Component CsvExampleGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
  }
  id: "CsvExampleGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2022-02-03T11:09:37.055994"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.CsvExampleGen"
      }
    }
  }
}
outputs {
  outputs {
    key: "examples"
    value {
      artifact_spec {
        type {
          name: "Examples"
          properties {
            key: "span"
            value: INT
          }
          properties {
            key: "split_names"
            value: STRING
          }
          properties {
            key: "version"
            value: INT
          }
          base_type: DATASET
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "input_base"
    value {
      field_value {
        string_value: "/home/kbuilder/imported/my_pipeline/data"
      }
    }
  }
  parameters {
    key: "input_config"
    value {
      field_value {
        string_value: "{\n  \"splits\": [\n    {\n      \"name\": \"single_split\",\n      \"pattern\": \"*\"\n    }\n  ]\n}"
      }
    }
  }
  parameters {
    key: "output_config"
    value {
      field_value {
        string_value: "{\n  \"split_config\": {\n    \"splits\": [\n      {\n        \"hash_buckets\": 2,\n        \"name\": \"train\"\n      },\n      {\n        \"hash_buckets\": 1,\n        \"name\": \"eval\"\n      }\n    ]\n  }\n}"
      }
    }
  }
  parameters {
    key: "output_data_format"
    value {
      field_value {
        int_value: 6
      }
    }
  }
  parameters {
    key: "output_file_format"
    value {
      field_value {
        int_value: 5
      }
    }
  }
}
downstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
  }
}

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:select span and version = (0, None)
INFO:absl:latest span and version = (0, None)
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 4
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=4, input_dict={}, output_dict=defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/4"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522"
  }
}
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:37.055994:CsvExampleGen:examples:0"
  }
}
custom_properties {
  key: "span"
  value {
    int_value: 0
  }
}
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
properties {
  key: "version"
  value: INT
}
base_type: DATASET
)]}), exec_properties={'output_file_format': 5, 'input_base': '/home/kbuilder/imported/my_pipeline/data', 'output_config': '{\n  "split_config": {\n    "splits": [\n      {\n        "hash_buckets": 2,\n        "name": "train"\n      },\n      {\n        "hash_buckets": 1,\n        "name": "eval"\n      }\n    ]\n  }\n}', 'output_data_format': 6, 'input_config': '{\n  "splits": [\n    {\n      "name": "single_split",\n      "pattern": "*"\n    }\n  ]\n}', 'span': 0, 'version': None, 'input_fingerprint': 'split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/4/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/stateful_working_dir/2022-02-03T11:09:37.055994', tmp_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/4/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
  }
  id: "CsvExampleGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2022-02-03T11:09:37.055994"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.CsvExampleGen"
      }
    }
  }
}
outputs {
  outputs {
    key: "examples"
    value {
      artifact_spec {
        type {
          name: "Examples"
          properties {
            key: "span"
            value: INT
          }
          properties {
            key: "split_names"
            value: STRING
          }
          properties {
            key: "version"
            value: INT
          }
          base_type: DATASET
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "input_base"
    value {
      field_value {
        string_value: "/home/kbuilder/imported/my_pipeline/data"
      }
    }
  }
  parameters {
    key: "input_config"
    value {
      field_value {
        string_value: "{\n  \"splits\": [\n    {\n      \"name\": \"single_split\",\n      \"pattern\": \"*\"\n    }\n  ]\n}"
      }
    }
  }
  parameters {
    key: "output_config"
    value {
      field_value {
        string_value: "{\n  \"split_config\": {\n    \"splits\": [\n      {\n        \"hash_buckets\": 2,\n        \"name\": \"train\"\n      },\n      {\n        \"hash_buckets\": 1,\n        \"name\": \"eval\"\n      }\n    ]\n  }\n}"
      }
    }
  }
  parameters {
    key: "output_data_format"
    value {
      field_value {
        int_value: 6
      }
    }
  }
  parameters {
    key: "output_file_format"
    value {
      field_value {
        int_value: 5
      }
    }
  }
}
downstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
  }
}
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2022-02-03T11:09:37.055994')
INFO:absl:Generating examples.
INFO:absl:Processing input csv data /home/kbuilder/imported/my_pipeline/data/* to TFExample.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
E0203 11:09:37.596944686    5287 fork_posix.cc:70]           Fork support is only compatible with the epoll1 and poll polling strategies
WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be.
INFO:absl:Examples generated.
INFO:absl:Value type <class 'NoneType'> of key version in exec_properties is not supported, going to drop it
INFO:absl:Value type <class 'list'> of key _beam_pipeline_args in exec_properties is not supported, going to drop it
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 4 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/4"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522"
  }
}
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:37.055994:CsvExampleGen:examples:0"
  }
}
custom_properties {
  key: "span"
  value {
    int_value: 0
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
  }
}
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
properties {
  key: "version"
  value: INT
}
base_type: DATASET
)]}) for execution 4
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component CsvExampleGen is finished.
INFO:absl:Component StatisticsGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.statistics_gen.component.StatisticsGen"
    base_type: PROCESS
  }
  id: "StatisticsGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2022-02-03T11:09:37.055994"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.StatisticsGen"
      }
    }
  }
}
inputs {
  inputs {
    key: "examples"
    value {
      channels {
        producer_node_query {
          id: "CsvExampleGen"
        }
        context_queries {
          type {
            name: "pipeline"
          }
          name {
            field_value {
              string_value: "my_pipeline"
            }
          }
        }
        context_queries {
          type {
            name: "pipeline_run"
          }
          name {
            field_value {
              string_value: "2022-02-03T11:09:37.055994"
            }
          }
        }
        context_queries {
          type {
            name: "node"
          }
          name {
            field_value {
              string_value: "my_pipeline.CsvExampleGen"
            }
          }
        }
        artifact_query {
          type {
            name: "Examples"
            base_type: DATASET
          }
        }
        output_key: "examples"
      }
      min_count: 1
    }
  }
}
outputs {
  outputs {
    key: "statistics"
    value {
      artifact_spec {
        type {
          name: "ExampleStatistics"
          properties {
            key: "span"
            value: INT
          }
          properties {
            key: "split_names"
            value: STRING
          }
          base_type: STATISTICS
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
      }
    }
  }
}
upstream_nodes: "CsvExampleGen"
downstream_nodes: "SchemaGen"
execution_options {
  caching_options {
  }
}

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 5
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=5, input_dict={'examples': [Artifact(artifact: id: 4
type_id: 15
uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/4"
properties {
  key: "split_names"
  value {
    string_value: "[\"train\", \"eval\"]"
  }
}
custom_properties {
  key: "file_format"
  value {
    string_value: "tfrecords_gzip"
  }
}
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522"
  }
}
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:37.055994:CsvExampleGen:examples:0"
  }
}
custom_properties {
  key: "payload_format"
  value {
    string_value: "FORMAT_TF_EXAMPLE"
  }
}
custom_properties {
  key: "span"
  value {
    int_value: 0
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
  }
}
state: LIVE
create_time_since_epoch: 1643886578210
last_update_time_since_epoch: 1643886578210
, artifact_type: id: 15
name: "Examples"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
properties {
  key: "version"
  value: INT
}
base_type: DATASET
)]}, output_dict=defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/5"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:37.055994:StatisticsGen:statistics:0"
  }
}
, artifact_type: name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
base_type: STATISTICS
)]}), exec_properties={'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/5/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/stateful_working_dir/2022-02-03T11:09:37.055994', tmp_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/5/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.statistics_gen.component.StatisticsGen"
    base_type: PROCESS
  }
  id: "StatisticsGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2022-02-03T11:09:37.055994"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.StatisticsGen"
      }
    }
  }
}
inputs {
  inputs {
    key: "examples"
    value {
      channels {
        producer_node_query {
          id: "CsvExampleGen"
        }
        context_queries {
          type {
            name: "pipeline"
          }
          name {
            field_value {
              string_value: "my_pipeline"
            }
          }
        }
        context_queries {
          type {
            name: "pipeline_run"
          }
          name {
            field_value {
              string_value: "2022-02-03T11:09:37.055994"
            }
          }
        }
        context_queries {
          type {
            name: "node"
          }
          name {
            field_value {
              string_value: "my_pipeline.CsvExampleGen"
            }
          }
        }
        artifact_query {
          type {
            name: "Examples"
            base_type: DATASET
          }
        }
        output_key: "examples"
      }
      min_count: 1
    }
  }
}
outputs {
  outputs {
    key: "statistics"
    value {
      artifact_spec {
        type {
          name: "ExampleStatistics"
          properties {
            key: "span"
            value: INT
          }
          properties {
            key: "split_names"
            value: STRING
          }
          base_type: STATISTICS
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
      }
    }
  }
}
upstream_nodes: "CsvExampleGen"
downstream_nodes: "SchemaGen"
execution_options {
  caching_options {
  }
}
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2022-02-03T11:09:37.055994')
INFO:absl:Generating statistics for split train.
INFO:absl:Statistics for split train written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/5/Split-train.
INFO:absl:Generating statistics for split eval.
INFO:absl:Statistics for split eval written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/5/Split-eval.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 5 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/5"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:37.055994:StatisticsGen:statistics:0"
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
  }
}
, artifact_type: name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
base_type: STATISTICS
)]}) for execution 5
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component StatisticsGen is finished.
INFO:absl:Component SchemaGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.schema_gen.component.SchemaGen"
    base_type: PROCESS
  }
  id: "SchemaGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2022-02-03T11:09:37.055994"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.SchemaGen"
      }
    }
  }
}
inputs {
  inputs {
    key: "statistics"
    value {
      channels {
        producer_node_query {
          id: "StatisticsGen"
        }
        context_queries {
          type {
            name: "pipeline"
          }
          name {
            field_value {
              string_value: "my_pipeline"
            }
          }
        }
        context_queries {
          type {
            name: "pipeline_run"
          }
          name {
            field_value {
              string_value: "2022-02-03T11:09:37.055994"
            }
          }
        }
        context_queries {
          type {
            name: "node"
          }
          name {
            field_value {
              string_value: "my_pipeline.StatisticsGen"
            }
          }
        }
        artifact_query {
          type {
            name: "ExampleStatistics"
            base_type: STATISTICS
          }
        }
        output_key: "statistics"
      }
      min_count: 1
    }
  }
}
outputs {
  outputs {
    key: "schema"
    value {
      artifact_spec {
        type {
          name: "Schema"
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
      }
    }
  }
  parameters {
    key: "infer_feature_shape"
    value {
      field_value {
        int_value: 1
      }
    }
  }
}
upstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
  }
}

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 6
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=6, input_dict={'statistics': [Artifact(artifact: id: 5
type_id: 17
uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/5"
properties {
  key: "split_names"
  value {
    string_value: "[\"train\", \"eval\"]"
  }
}
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:37.055994:StatisticsGen:statistics:0"
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
  }
}
state: LIVE
create_time_since_epoch: 1643886581527
last_update_time_since_epoch: 1643886581527
, artifact_type: id: 17
name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
base_type: STATISTICS
)]}, output_dict=defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/6"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:37.055994:SchemaGen:schema:0"
  }
}
, artifact_type: name: "Schema"
)]}), exec_properties={'exclude_splits': '[]', 'infer_feature_shape': 1}, execution_output_uri='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/6/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/stateful_working_dir/2022-02-03T11:09:37.055994', tmp_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/6/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.schema_gen.component.SchemaGen"
    base_type: PROCESS
  }
  id: "SchemaGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2022-02-03T11:09:37.055994"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.SchemaGen"
      }
    }
  }
}
inputs {
  inputs {
    key: "statistics"
    value {
      channels {
        producer_node_query {
          id: "StatisticsGen"
        }
        context_queries {
          type {
            name: "pipeline"
          }
          name {
            field_value {
              string_value: "my_pipeline"
            }
          }
        }
        context_queries {
          type {
            name: "pipeline_run"
          }
          name {
            field_value {
              string_value: "2022-02-03T11:09:37.055994"
            }
          }
        }
        context_queries {
          type {
            name: "node"
          }
          name {
            field_value {
              string_value: "my_pipeline.StatisticsGen"
            }
          }
        }
        artifact_query {
          type {
            name: "ExampleStatistics"
            base_type: STATISTICS
          }
        }
        output_key: "statistics"
      }
      min_count: 1
    }
  }
}
outputs {
  outputs {
    key: "schema"
    value {
      artifact_spec {
        type {
          name: "Schema"
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
      }
    }
  }
  parameters {
    key: "infer_feature_shape"
    value {
      field_value {
        int_value: 1
      }
    }
  }
}
upstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
  }
}
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2022-02-03T11:09:37.055994')
INFO:absl:Processing schema from statistics for split train.
INFO:absl:Processing schema from statistics for split eval.
INFO:absl:Schema written to ./tfx_pipeline_output/my_pipeline/SchemaGen/schema/6/schema.pbtxt.
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 6 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/6"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:37.055994:SchemaGen:schema:0"
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
  }
}
, artifact_type: name: "Schema"
)]}) for execution 6
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component SchemaGen is finished.

Если конвейер запущен успешно, вы должны увидеть сообщение «Преобразование компонента завершено». где- то в журнале. Поскольку компонент Transform и компонент ExampleValidator не зависят друг от друга, порядок выполнения не является фиксированным. Тем не менее, любой из Transform и ExampleValidator может быть последним компонентом в выполнении конвейера.

Изучите выходные данные Transform

Компонент Transform создает два вида выходных данных: график Tensorflow и преобразованные примеры. Преобразованные примеры относятся к типу артефакта Examples, который также создается ExampleGen, но вместо этого содержит преобразованные значения функций.

Вы можете изучить их, как мы это делали на предыдущем шаге.

with metadata.Metadata(metadata_connection_config) as metadata_handler:
    # Search all aritfacts from the previous run of Transform component.
    artifacts = get_latest_artifacts(metadata_handler.store,
                                     PIPELINE_NAME, "Transform")
    # Find artifacts of Examples type.
    transformed_examples_artifacts = find_latest_artifacts_by_type(
        metadata_handler.store, artifacts,
        standard_artifacts.Examples.TYPE_NAME)
preview_examples(transformed_examples_artifacts)

Шаг 4. Обучите модель с помощью компонента Trainer.

Мы построим модель ML, используя компонент Trainer . Дополнительную информацию см. в руководстве по компоненту Trainer . Вам необходимо предоставить код модели для компонента Trainer.

Определите свою модель.

В шаблоне пингвина models.model.run_fn используется как аргумент run_fn для компонента Trainer . Это означает, что run_fn() в models/model.py будет вызываться при запуске компонента Trainer . Вы можете увидеть код для построения простой модели DNN с использованием keras API в данном коде. См. TensorFlow 2.x в руководстве по TFX для получения дополнительной информации об использовании keras API в TFX.

В этом run_fn вы должны построить модель и сохранить ее в каталог, указанный fn_args.serving_model_dir , который указан компонентом. Вы можете использовать другие аргументы в fn_args , которые передаются в run_fn . Полный список аргументов в fn_args . в соответствующих кодах.

Определите свои функции в models/features.py и используйте их по мере необходимости. Если вы преобразовали свои объекты на шаге 3, вы должны использовать преобразованные объекты в качестве входных данных для своей модели.

Добавьте компонент Trainer в конвейер.

Если ваш run_fn готов, добавьте компонент Trainer в конвейер.

  1. В файле pipeline/pipeline.py раскомментируйте # components.append(trainer) , чтобы добавить компонент в конвейер.

Аргументы для компонента тренера могут зависеть от того, используете ли вы компонент Transform или нет.

  • Если вы НЕ используете компонент Transform , вам не нужно изменять аргументы.
  • Если вы используете компонент Transform , вам необходимо изменить аргументы при создании экземпляра компонента Trainer .

    • Измените аргумент examples на examples=transform.outputs['transformed_examples'], . Нам нужно использовать преобразованные примеры для обучения.
    • Добавьте аргумент transform_graph , например, transform_graph=transform.outputs['transform_graph'], . Этот граф содержит граф TensorFlow для операций преобразования.
    • После вышеуказанных изменений код для создания компонента Trainer будет выглядеть следующим образом.
    # If you use a Transform component.
    trainer = Trainer(
        run_fn=run_fn,
        examples=transform.outputs['transformed_examples'],
        transform_graph=transform.outputs['transform_graph'],
        schema=schema_gen.outputs['schema'],
        ...
    

Вы можете обновить конвейер и запустить его снова.

!tfx pipeline update --engine=local --pipeline_path=local_runner.py \
 && tfx run create --engine=local --pipeline_name={PIPELINE_NAME}
CLI
Updating pipeline
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
Pipeline "my_pipeline" updated successfully.
CLI
Creating a run for pipeline: my_pipeline
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Using deployment config:
 executor_specs {
  key: "CsvExampleGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor"
      }
    }
  }
}
executor_specs {
  key: "SchemaGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.schema_gen.executor.Executor"
    }
  }
}
executor_specs {
  key: "StatisticsGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.statistics_gen.executor.Executor"
      }
    }
  }
}
custom_driver_specs {
  key: "CsvExampleGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.example_gen.driver.FileBasedDriver"
    }
  }
}
metadata_connection_config {
  database_connection_config {
    sqlite {
      filename_uri: "./tfx_metadata/my_pipeline/metadata.db"
      connection_mode: READWRITE_OPENCREATE
    }
  }
}

INFO:absl:Using connection config:
 sqlite {
  filename_uri: "./tfx_metadata/my_pipeline/metadata.db"
  connection_mode: READWRITE_OPENCREATE
}

INFO:absl:Component CsvExampleGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
  }
  id: "CsvExampleGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2022-02-03T11:10:00.469382"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.CsvExampleGen"
      }
    }
  }
}
outputs {
  outputs {
    key: "examples"
    value {
      artifact_spec {
        type {
          name: "Examples"
          properties {
            key: "span"
            value: INT
          }
          properties {
            key: "split_names"
            value: STRING
          }
          properties {
            key: "version"
            value: INT
          }
          base_type: DATASET
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "input_base"
    value {
      field_value {
        string_value: "/home/kbuilder/imported/my_pipeline/data"
      }
    }
  }
  parameters {
    key: "input_config"
    value {
      field_value {
        string_value: "{\n  \"splits\": [\n    {\n      \"name\": \"single_split\",\n      \"pattern\": \"*\"\n    }\n  ]\n}"
      }
    }
  }
  parameters {
    key: "output_config"
    value {
      field_value {
        string_value: "{\n  \"split_config\": {\n    \"splits\": [\n      {\n        \"hash_buckets\": 2,\n        \"name\": \"train\"\n      },\n      {\n        \"hash_buckets\": 1,\n        \"name\": \"eval\"\n      }\n    ]\n  }\n}"
      }
    }
  }
  parameters {
    key: "output_data_format"
    value {
      field_value {
        int_value: 6
      }
    }
  }
  parameters {
    key: "output_file_format"
    value {
      field_value {
        int_value: 5
      }
    }
  }
}
downstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
  }
}

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:select span and version = (0, None)
INFO:absl:latest span and version = (0, None)
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 7
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=7, input_dict={}, output_dict=defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/7"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522"
  }
}
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:00.469382:CsvExampleGen:examples:0"
  }
}
custom_properties {
  key: "span"
  value {
    int_value: 0
  }
}
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
properties {
  key: "version"
  value: INT
}
base_type: DATASET
)]}), exec_properties={'input_base': '/home/kbuilder/imported/my_pipeline/data', 'output_config': '{\n  "split_config": {\n    "splits": [\n      {\n        "hash_buckets": 2,\n        "name": "train"\n      },\n      {\n        "hash_buckets": 1,\n        "name": "eval"\n      }\n    ]\n  }\n}', 'output_data_format': 6, 'output_file_format': 5, 'input_config': '{\n  "splits": [\n    {\n      "name": "single_split",\n      "pattern": "*"\n    }\n  ]\n}', 'span': 0, 'version': None, 'input_fingerprint': 'split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/7/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/stateful_working_dir/2022-02-03T11:10:00.469382', tmp_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/7/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
  }
  id: "CsvExampleGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2022-02-03T11:10:00.469382"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.CsvExampleGen"
      }
    }
  }
}
outputs {
  outputs {
    key: "examples"
    value {
      artifact_spec {
        type {
          name: "Examples"
          properties {
            key: "span"
            value: INT
          }
          properties {
            key: "split_names"
            value: STRING
          }
          properties {
            key: "version"
            value: INT
          }
          base_type: DATASET
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "input_base"
    value {
      field_value {
        string_value: "/home/kbuilder/imported/my_pipeline/data"
      }
    }
  }
  parameters {
    key: "input_config"
    value {
      field_value {
        string_value: "{\n  \"splits\": [\n    {\n      \"name\": \"single_split\",\n      \"pattern\": \"*\"\n    }\n  ]\n}"
      }
    }
  }
  parameters {
    key: "output_config"
    value {
      field_value {
        string_value: "{\n  \"split_config\": {\n    \"splits\": [\n      {\n        \"hash_buckets\": 2,\n        \"name\": \"train\"\n      },\n      {\n        \"hash_buckets\": 1,\n        \"name\": \"eval\"\n      }\n    ]\n  }\n}"
      }
    }
  }
  parameters {
    key: "output_data_format"
    value {
      field_value {
        int_value: 6
      }
    }
  }
  parameters {
    key: "output_file_format"
    value {
      field_value {
        int_value: 5
      }
    }
  }
}
downstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
  }
}
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2022-02-03T11:10:00.469382')
INFO:absl:Generating examples.
INFO:absl:Processing input csv data /home/kbuilder/imported/my_pipeline/data/* to TFExample.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
E0203 11:10:01.173700221    5436 fork_posix.cc:70]           Fork support is only compatible with the epoll1 and poll polling strategies
WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be.
INFO:absl:Examples generated.
INFO:absl:Value type <class 'NoneType'> of key version in exec_properties is not supported, going to drop it
INFO:absl:Value type <class 'list'> of key _beam_pipeline_args in exec_properties is not supported, going to drop it
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 7 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/7"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522"
  }
}
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:00.469382:CsvExampleGen:examples:0"
  }
}
custom_properties {
  key: "span"
  value {
    int_value: 0
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
  }
}
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
properties {
  key: "version"
  value: INT
}
base_type: DATASET
)]}) for execution 7
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component CsvExampleGen is finished.
INFO:absl:Component StatisticsGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.statistics_gen.component.StatisticsGen"
    base_type: PROCESS
  }
  id: "StatisticsGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2022-02-03T11:10:00.469382"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.StatisticsGen"
      }
    }
  }
}
inputs {
  inputs {
    key: "examples"
    value {
      channels {
        producer_node_query {
          id: "CsvExampleGen"
        }
        context_queries {
          type {
            name: "pipeline"
          }
          name {
            field_value {
              string_value: "my_pipeline"
            }
          }
        }
        context_queries {
          type {
            name: "pipeline_run"
          }
          name {
            field_value {
              string_value: "2022-02-03T11:10:00.469382"
            }
          }
        }
        context_queries {
          type {
            name: "node"
          }
          name {
            field_value {
              string_value: "my_pipeline.CsvExampleGen"
            }
          }
        }
        artifact_query {
          type {
            name: "Examples"
            base_type: DATASET
          }
        }
        output_key: "examples"
      }
      min_count: 1
    }
  }
}
outputs {
  outputs {
    key: "statistics"
    value {
      artifact_spec {
        type {
          name: "ExampleStatistics"
          properties {
            key: "span"
            value: INT
          }
          properties {
            key: "split_names"
            value: STRING
          }
          base_type: STATISTICS
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
      }
    }
  }
}
upstream_nodes: "CsvExampleGen"
downstream_nodes: "SchemaGen"
execution_options {
  caching_options {
  }
}

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 8
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=8, input_dict={'examples': [Artifact(artifact: id: 7
type_id: 15
uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/7"
properties {
  key: "split_names"
  value {
    string_value: "[\"train\", \"eval\"]"
  }
}
custom_properties {
  key: "file_format"
  value {
    string_value: "tfrecords_gzip"
  }
}
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522"
  }
}
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:00.469382:CsvExampleGen:examples:0"
  }
}
custom_properties {
  key: "payload_format"
  value {
    string_value: "FORMAT_TF_EXAMPLE"
  }
}
custom_properties {
  key: "span"
  value {
    int_value: 0
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
  }
}
state: LIVE
create_time_since_epoch: 1643886601629
last_update_time_since_epoch: 1643886601629
, artifact_type: id: 15
name: "Examples"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
properties {
  key: "version"
  value: INT
}
base_type: DATASET
)]}, output_dict=defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/8"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:00.469382:StatisticsGen:statistics:0"
  }
}
, artifact_type: name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
base_type: STATISTICS
)]}), exec_properties={'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/8/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/stateful_working_dir/2022-02-03T11:10:00.469382', tmp_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/8/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.statistics_gen.component.StatisticsGen"
    base_type: PROCESS
  }
  id: "StatisticsGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2022-02-03T11:10:00.469382"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.StatisticsGen"
      }
    }
  }
}
inputs {
  inputs {
    key: "examples"
    value {
      channels {
        producer_node_query {
          id: "CsvExampleGen"
        }
        context_queries {
          type {
            name: "pipeline"
          }
          name {
            field_value {
              string_value: "my_pipeline"
            }
          }
        }
        context_queries {
          type {
            name: "pipeline_run"
          }
          name {
            field_value {
              string_value: "2022-02-03T11:10:00.469382"
            }
          }
        }
        context_queries {
          type {
            name: "node"
          }
          name {
            field_value {
              string_value: "my_pipeline.CsvExampleGen"
            }
          }
        }
        artifact_query {
          type {
            name: "Examples"
            base_type: DATASET
          }
        }
        output_key: "examples"
      }
      min_count: 1
    }
  }
}
outputs {
  outputs {
    key: "statistics"
    value {
      artifact_spec {
        type {
          name: "ExampleStatistics"
          properties {
            key: "span"
            value: INT
          }
          properties {
            key: "split_names"
            value: STRING
          }
          base_type: STATISTICS
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
      }
    }
  }
}
upstream_nodes: "CsvExampleGen"
downstream_nodes: "SchemaGen"
execution_options {
  caching_options {
  }
}
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2022-02-03T11:10:00.469382')
INFO:absl:Generating statistics for split train.
INFO:absl:Statistics for split train written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/8/Split-train.
INFO:absl:Generating statistics for split eval.
INFO:absl:Statistics for split eval written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/8/Split-eval.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 8 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/8"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:00.469382:StatisticsGen:statistics:0"
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
  }
}
, artifact_type: name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
base_type: STATISTICS
)]}) for execution 8
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component StatisticsGen is finished.
INFO:absl:Component SchemaGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.schema_gen.component.SchemaGen"
    base_type: PROCESS
  }
  id: "SchemaGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2022-02-03T11:10:00.469382"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.SchemaGen"
      }
    }
  }
}
inputs {
  inputs {
    key: "statistics"
    value {
      channels {
        producer_node_query {
          id: "StatisticsGen"
        }
        context_queries {
          type {
            name: "pipeline"
          }
          name {
            field_value {
              string_value: "my_pipeline"
            }
          }
        }
        context_queries {
          type {
            name: "pipeline_run"
          }
          name {
            field_value {
              string_value: "2022-02-03T11:10:00.469382"
            }
          }
        }
        context_queries {
          type {
            name: "node"
          }
          name {
            field_value {
              string_value: "my_pipeline.StatisticsGen"
            }
          }
        }
        artifact_query {
          type {
            name: "ExampleStatistics"
            base_type: STATISTICS
          }
        }
        output_key: "statistics"
      }
      min_count: 1
    }
  }
}
outputs {
  outputs {
    key: "schema"
    value {
      artifact_spec {
        type {
          name: "Schema"
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
      }
    }
  }
  parameters {
    key: "infer_feature_shape"
    value {
      field_value {
        int_value: 1
      }
    }
  }
}
upstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
  }
}

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 9
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=9, input_dict={'statistics': [Artifact(artifact: id: 8
type_id: 17
uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/8"
properties {
  key: "split_names"
  value {
    string_value: "[\"train\", \"eval\"]"
  }
}
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:00.469382:StatisticsGen:statistics:0"
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
  }
}
state: LIVE
create_time_since_epoch: 1643886605023
last_update_time_since_epoch: 1643886605023
, artifact_type: id: 17
name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
base_type: STATISTICS
)]}, output_dict=defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/9"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:00.469382:SchemaGen:schema:0"
  }
}
, artifact_type: name: "Schema"
)]}), exec_properties={'exclude_splits': '[]', 'infer_feature_shape': 1}, execution_output_uri='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/9/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/stateful_working_dir/2022-02-03T11:10:00.469382', tmp_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/9/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.schema_gen.component.SchemaGen"
    base_type: PROCESS
  }
  id: "SchemaGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2022-02-03T11:10:00.469382"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.SchemaGen"
      }
    }
  }
}
inputs {
  inputs {
    key: "statistics"
    value {
      channels {
        producer_node_query {
          id: "StatisticsGen"
        }
        context_queries {
          type {
            name: "pipeline"
          }
          name {
            field_value {
              string_value: "my_pipeline"
            }
          }
        }
        context_queries {
          type {
            name: "pipeline_run"
          }
          name {
            field_value {
              string_value: "2022-02-03T11:10:00.469382"
            }
          }
        }
        context_queries {
          type {
            name: "node"
          }
          name {
            field_value {
              string_value: "my_pipeline.StatisticsGen"
            }
          }
        }
        artifact_query {
          type {
            name: "ExampleStatistics"
            base_type: STATISTICS
          }
        }
        output_key: "statistics"
      }
      min_count: 1
    }
  }
}
outputs {
  outputs {
    key: "schema"
    value {
      artifact_spec {
        type {
          name: "Schema"
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
      }
    }
  }
  parameters {
    key: "infer_feature_shape"
    value {
      field_value {
        int_value: 1
      }
    }
  }
}
upstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
  }
}
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2022-02-03T11:10:00.469382')
INFO:absl:Processing schema from statistics for split train.
INFO:absl:Processing schema from statistics for split eval.
INFO:absl:Schema written to ./tfx_pipeline_output/my_pipeline/SchemaGen/schema/9/schema.pbtxt.
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 9 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/9"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:00.469382:SchemaGen:schema:0"
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
  }
}
, artifact_type: name: "Schema"
)]}) for execution 9
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component SchemaGen is finished.

Когда это выполнение выполняется успешно, вы создали и запустили свой первый конвейер TFX для своей модели. Поздравляем!

Ваша новая модель будет расположена в каком-то месте в выходном каталоге, но было бы лучше иметь модель в фиксированном месте или службе за пределами конвейера TFX, которая содержит много промежуточных результатов. Еще лучше с непрерывной оценкой построенной модели, которая имеет решающее значение в производственных системах машинного обучения. На следующем шаге мы увидим, как непрерывная оценка и развертывание работают в TFX.

Шаг 5. (Необязательно) Оцените модель с помощью Evaluator и опубликуйте с помощью pusher.

Компонент Evaluator постоянно оценивает каждую построенную модель из Trainer , а Pusher копирует модель в предопределенное место в файловой системе или даже в Google Cloud AI Platform Models .

Добавляет компонент Evaluator в конвейер.

В файле pipeline/pipeline.py :

  1. Раскомментируйте # components.append(model_resolver) , чтобы добавить последний преобразователь модели в конвейер. Evaluator можно использовать для сравнения модели со старой базовой моделью, которая прошла Evaluator при последнем запуске конвейера. LatestBlessedModelResolver находит последнюю модель, прошедшую проверку.
  2. Установите правильный tfma.MetricsSpec для вашей модели. Оценка может отличаться для каждой модели машинного обучения. В шаблоне пингвина использовалась SparseCategoricalAccuracy , потому что мы решаем проблему классификации нескольких категорий. Вам также необходимо указать tfma.SliceSpec для анализа вашей модели на наличие определенных срезов. Дополнительные сведения см. в руководстве по компоненту Evaluator .
  3. Раскомментируйте # components.append(evaluator) , чтобы добавить компонент в конвейер.

Вы можете обновить конвейер и запустить его снова.

# Update and run the pipeline.
!tfx pipeline update --engine=local --pipeline_path=local_runner.py \
 && tfx run create --engine=local --pipeline_name={PIPELINE_NAME}
CLI
Updating pipeline
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
Pipeline "my_pipeline" updated successfully.
CLI
Creating a run for pipeline: my_pipeline
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Using deployment config:
 executor_specs {
  key: "CsvExampleGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor"
      }
    }
  }
}
executor_specs {
  key: "SchemaGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.schema_gen.executor.Executor"
    }
  }
}
executor_specs {
  key: "StatisticsGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.statistics_gen.executor.Executor"
      }
    }
  }
}
custom_driver_specs {
  key: "CsvExampleGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.example_gen.driver.FileBasedDriver"
    }
  }
}
metadata_connection_config {
  database_connection_config {
    sqlite {
      filename_uri: "./tfx_metadata/my_pipeline/metadata.db"
      connection_mode: READWRITE_OPENCREATE
    }
  }
}

INFO:absl:Using connection config:
 sqlite {
  filename_uri: "./tfx_metadata/my_pipeline/metadata.db"
  connection_mode: READWRITE_OPENCREATE
}

INFO:absl:Component CsvExampleGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
  }
  id: "CsvExampleGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2022-02-03T11:10:24.358660"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.CsvExampleGen"
      }
    }
  }
}
outputs {
  outputs {
    key: "examples"
    value {
      artifact_spec {
        type {
          name: "Examples"
          properties {
            key: "span"
            value: INT
          }
          properties {
            key: "split_names"
            value: STRING
          }
          properties {
            key: "version"
            value: INT
          }
          base_type: DATASET
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "input_base"
    value {
      field_value {
        string_value: "/home/kbuilder/imported/my_pipeline/data"
      }
    }
  }
  parameters {
    key: "input_config"
    value {
      field_value {
        string_value: "{\n  \"splits\": [\n    {\n      \"name\": \"single_split\",\n      \"pattern\": \"*\"\n    }\n  ]\n}"
      }
    }
  }
  parameters {
    key: "output_config"
    value {
      field_value {
        string_value: "{\n  \"split_config\": {\n    \"splits\": [\n      {\n        \"hash_buckets\": 2,\n        \"name\": \"train\"\n      },\n      {\n        \"hash_buckets\": 1,\n        \"name\": \"eval\"\n      }\n    ]\n  }\n}"
      }
    }
  }
  parameters {
    key: "output_data_format"
    value {
      field_value {
        int_value: 6
      }
    }
  }
  parameters {
    key: "output_file_format"
    value {
      field_value {
        int_value: 5
      }
    }
  }
}
downstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
  }
}

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:select span and version = (0, None)
INFO:absl:latest span and version = (0, None)
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 10
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=10, input_dict={}, output_dict=defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/10"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522"
  }
}
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:24.358660:CsvExampleGen:examples:0"
  }
}
custom_properties {
  key: "span"
  value {
    int_value: 0
  }
}
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
properties {
  key: "version"
  value: INT
}
base_type: DATASET
)]}), exec_properties={'output_file_format': 5, 'output_config': '{\n  "split_config": {\n    "splits": [\n      {\n        "hash_buckets": 2,\n        "name": "train"\n      },\n      {\n        "hash_buckets": 1,\n        "name": "eval"\n      }\n    ]\n  }\n}', 'output_data_format': 6, 'input_base': '/home/kbuilder/imported/my_pipeline/data', 'input_config': '{\n  "splits": [\n    {\n      "name": "single_split",\n      "pattern": "*"\n    }\n  ]\n}', 'span': 0, 'version': None, 'input_fingerprint': 'split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/10/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/stateful_working_dir/2022-02-03T11:10:24.358660', tmp_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/10/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
  }
  id: "CsvExampleGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2022-02-03T11:10:24.358660"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.CsvExampleGen"
      }
    }
  }
}
outputs {
  outputs {
    key: "examples"
    value {
      artifact_spec {
        type {
          name: "Examples"
          properties {
            key: "span"
            value: INT
          }
          properties {
            key: "split_names"
            value: STRING
          }
          properties {
            key: "version"
            value: INT
          }
          base_type: DATASET
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "input_base"
    value {
      field_value {
        string_value: "/home/kbuilder/imported/my_pipeline/data"
      }
    }
  }
  parameters {
    key: "input_config"
    value {
      field_value {
        string_value: "{\n  \"splits\": [\n    {\n      \"name\": \"single_split\",\n      \"pattern\": \"*\"\n    }\n  ]\n}"
      }
    }
  }
  parameters {
    key: "output_config"
    value {
      field_value {
        string_value: "{\n  \"split_config\": {\n    \"splits\": [\n      {\n        \"hash_buckets\": 2,\n        \"name\": \"train\"\n      },\n      {\n        \"hash_buckets\": 1,\n        \"name\": \"eval\"\n      }\n    ]\n  }\n}"
      }
    }
  }
  parameters {
    key: "output_data_format"
    value {
      field_value {
        int_value: 6
      }
    }
  }
  parameters {
    key: "output_file_format"
    value {
      field_value {
        int_value: 5
      }
    }
  }
}
downstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
  }
}
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2022-02-03T11:10:24.358660')
INFO:absl:Generating examples.
INFO:absl:Processing input csv data /home/kbuilder/imported/my_pipeline/data/* to TFExample.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
E0203 11:10:24.894390124    5584 fork_posix.cc:70]           Fork support is only compatible with the epoll1 and poll polling strategies
WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be.
INFO:absl:Examples generated.
INFO:absl:Value type <class 'NoneType'> of key version in exec_properties is not supported, going to drop it
INFO:absl:Value type <class 'list'> of key _beam_pipeline_args in exec_properties is not supported, going to drop it
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 10 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/10"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522"
  }
}
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:24.358660:CsvExampleGen:examples:0"
  }
}
custom_properties {
  key: "span"
  value {
    int_value: 0
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
  }
}
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
properties {
  key: "version"
  value: INT
}
base_type: DATASET
)]}) for execution 10
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component CsvExampleGen is finished.
INFO:absl:Component StatisticsGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.statistics_gen.component.StatisticsGen"
    base_type: PROCESS
  }
  id: "StatisticsGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2022-02-03T11:10:24.358660"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.StatisticsGen"
      }
    }
  }
}
inputs {
  inputs {
    key: "examples"
    value {
      channels {
        producer_node_query {
          id: "CsvExampleGen"
        }
        context_queries {
          type {
            name: "pipeline"
          }
          name {
            field_value {
              string_value: "my_pipeline"
            }
          }
        }
        context_queries {
          type {
            name: "pipeline_run"
          }
          name {
            field_value {
              string_value: "2022-02-03T11:10:24.358660"
            }
          }
        }
        context_queries {
          type {
            name: "node"
          }
          name {
            field_value {
              string_value: "my_pipeline.CsvExampleGen"
            }
          }
        }
        artifact_query {
          type {
            name: "Examples"
            base_type: DATASET
          }
        }
        output_key: "examples"
      }
      min_count: 1
    }
  }
}
outputs {
  outputs {
    key: "statistics"
    value {
      artifact_spec {
        type {
          name: "ExampleStatistics"
          properties {
            key: "span"
            value: INT
          }
          properties {
            key: "split_names"
            value: STRING
          }
          base_type: STATISTICS
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
      }
    }
  }
}
upstream_nodes: "CsvExampleGen"
downstream_nodes: "SchemaGen"
execution_options {
  caching_options {
  }
}

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 11
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=11, input_dict={'examples': [Artifact(artifact: id: 10
type_id: 15
uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/10"
properties {
  key: "split_names"
  value {
    string_value: "[\"train\", \"eval\"]"
  }
}
custom_properties {
  key: "file_format"
  value {
    string_value: "tfrecords_gzip"
  }
}
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522"
  }
}
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:24.358660:CsvExampleGen:examples:0"
  }
}
custom_properties {
  key: "payload_format"
  value {
    string_value: "FORMAT_TF_EXAMPLE"
  }
}
custom_properties {
  key: "span"
  value {
    int_value: 0
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
  }
}
state: LIVE
create_time_since_epoch: 1643886625515
last_update_time_since_epoch: 1643886625515
, artifact_type: id: 15
name: "Examples"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
properties {
  key: "version"
  value: INT
}
base_type: DATASET
)]}, output_dict=defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/11"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:24.358660:StatisticsGen:statistics:0"
  }
}
, artifact_type: name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
base_type: STATISTICS
)]}), exec_properties={'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/11/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/stateful_working_dir/2022-02-03T11:10:24.358660', tmp_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/11/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.statistics_gen.component.StatisticsGen"
    base_type: PROCESS
  }
  id: "StatisticsGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2022-02-03T11:10:24.358660"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.StatisticsGen"
      }
    }
  }
}
inputs {
  inputs {
    key: "examples"
    value {
      channels {
        producer_node_query {
          id: "CsvExampleGen"
        }
        context_queries {
          type {
            name: "pipeline"
          }
          name {
            field_value {
              string_value: "my_pipeline"
            }
          }
        }
        context_queries {
          type {
            name: "pipeline_run"
          }
          name {
            field_value {
              string_value: "2022-02-03T11:10:24.358660"
            }
          }
        }
        context_queries {
          type {
            name: "node"
          }
          name {
            field_value {
              string_value: "my_pipeline.CsvExampleGen"
            }
          }
        }
        artifact_query {
          type {
            name: "Examples"
            base_type: DATASET
          }
        }
        output_key: "examples"
      }
      min_count: 1
    }
  }
}
outputs {
  outputs {
    key: "statistics"
    value {
      artifact_spec {
        type {
          name: "ExampleStatistics"
          properties {
            key: "span"
            value: INT
          }
          properties {
            key: "split_names"
            value: STRING
          }
          base_type: STATISTICS
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
      }
    }
  }
}
upstream_nodes: "CsvExampleGen"
downstream_nodes: "SchemaGen"
execution_options {
  caching_options {
  }
}
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2022-02-03T11:10:24.358660')
INFO:absl:Generating statistics for split train.
INFO:absl:Statistics for split train written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/11/Split-train.
INFO:absl:Generating statistics for split eval.
INFO:absl:Statistics for split eval written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/11/Split-eval.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 11 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/11"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:24.358660:StatisticsGen:statistics:0"
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
  }
}
, artifact_type: name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
base_type: STATISTICS
)]}) for execution 11
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component StatisticsGen is finished.
INFO:absl:Component SchemaGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.schema_gen.component.SchemaGen"
    base_type: PROCESS
  }
  id: "SchemaGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2022-02-03T11:10:24.358660"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.SchemaGen"
      }
    }
  }
}
inputs {
  inputs {
    key: "statistics"
    value {
      channels {
        producer_node_query {
          id: "StatisticsGen"
        }
        context_queries {
          type {
            name: "pipeline"
          }
          name {
            field_value {
              string_value: "my_pipeline"
            }
          }
        }
        context_queries {
          type {
            name: "pipeline_run"
          }
          name {
            field_value {
              string_value: "2022-02-03T11:10:24.358660"
            }
          }
        }
        context_queries {
          type {
            name: "node"
          }
          name {
            field_value {
              string_value: "my_pipeline.StatisticsGen"
            }
          }
        }
        artifact_query {
          type {
            name: "ExampleStatistics"
            base_type: STATISTICS
          }
        }
        output_key: "statistics"
      }
      min_count: 1
    }
  }
}
outputs {
  outputs {
    key: "schema"
    value {
      artifact_spec {
        type {
          name: "Schema"
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
      }
    }
  }
  parameters {
    key: "infer_feature_shape"
    value {
      field_value {
        int_value: 1
      }
    }
  }
}
upstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
  }
}

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 12
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=12, input_dict={'statistics': [Artifact(artifact: id: 11
type_id: 17
uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/11"
properties {
  key: "split_names"
  value {
    string_value: "[\"train\", \"eval\"]"
  }
}
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:24.358660:StatisticsGen:statistics:0"
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
  }
}
state: LIVE
create_time_since_epoch: 1643886628941
last_update_time_since_epoch: 1643886628941
, artifact_type: id: 17
name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
base_type: STATISTICS
)]}, output_dict=defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/12"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:24.358660:SchemaGen:schema:0"
  }
}
, artifact_type: name: "Schema"
)]}), exec_properties={'infer_feature_shape': 1, 'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/12/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/stateful_working_dir/2022-02-03T11:10:24.358660', tmp_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/12/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.schema_gen.component.SchemaGen"
    base_type: PROCESS
  }
  id: "SchemaGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2022-02-03T11:10:24.358660"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.SchemaGen"
      }
    }
  }
}
inputs {
  inputs {
    key: "statistics"
    value {
      channels {
        producer_node_query {
          id: "StatisticsGen"
        }
        context_queries {
          type {
            name: "pipeline"
          }
          name {
            field_value {
              string_value: "my_pipeline"
            }
          }
        }
        context_queries {
          type {
            name: "pipeline_run"
          }
          name {
            field_value {
              string_value: "2022-02-03T11:10:24.358660"
            }
          }
        }
        context_queries {
          type {
            name: "node"
          }
          name {
            field_value {
              string_value: "my_pipeline.StatisticsGen"
            }
          }
        }
        artifact_query {
          type {
            name: "ExampleStatistics"
            base_type: STATISTICS
          }
        }
        output_key: "statistics"
      }
      min_count: 1
    }
  }
}
outputs {
  outputs {
    key: "schema"
    value {
      artifact_spec {
        type {
          name: "Schema"
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
      }
    }
  }
  parameters {
    key: "infer_feature_shape"
    value {
      field_value {
        int_value: 1
      }
    }
  }
}
upstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
  }
}
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2022-02-03T11:10:24.358660')
INFO:absl:Processing schema from statistics for split train.
INFO:absl:Processing schema from statistics for split eval.
INFO:absl:Schema written to ./tfx_pipeline_output/my_pipeline/SchemaGen/schema/12/schema.pbtxt.
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 12 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/12"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:24.358660:SchemaGen:schema:0"
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
  }
}
, artifact_type: name: "Schema"
)]}) for execution 12
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component SchemaGen is finished.

Изучите выходные данные Оценщика

Для этого шага требуется расширение Jupyter для ноутбука TensorFlow Model Analysis (TFMA). Обратите внимание, что версия расширения блокнота TFMA должна быть идентична версии пакета Python TFMA.

Следующая команда установит расширение ноутбука TFMA из реестра NPM. Это может занять несколько минут.

# Install TFMA notebook extension.
jupyter labextension install tensorflow_model_analysis@{tfma.__version__}
usage: jupyter [-h] [--version] [--config-dir] [--data-dir] [--runtime-dir]
               [--paths] [--json] [--debug]
               [subcommand]

Jupyter: Interactive Computing

positional arguments:
  subcommand     the subcommand to launch

optional arguments:
  -h, --help     show this help message and exit
  --version      show the versions of core jupyter packages and exit
  --config-dir   show Jupyter config dir
  --data-dir     show Jupyter data dir
  --runtime-dir  show Jupyter runtime dir
  --paths        show all Jupyter paths. Add --json for machine-readable
                 format.
  --json         output paths as machine-readable json
  --debug        output debug information about paths

Available subcommands: bundlerextension console dejavu execute kernel
kernelspec migrate nbconvert nbextension notebook qtconsole run
serverextension troubleshoot trust

Jupyter command `jupyter-labextension` not found.

Если установка завершена, перезагрузите браузер , чтобы расширение вступило в силу.

with metadata.Metadata(metadata_connection_config) as metadata_handler:
  # Search all aritfacts from the previous pipeline run.
  artifacts = get_latest_artifacts(metadata_handler.store, PIPELINE_NAME)
  model_evaluation_artifacts = find_latest_artifacts_by_type(
      metadata_handler.store, artifacts,
      standard_artifacts.ModelEvaluation.TYPE_NAME)
if model_evaluation_artifacts:
  tfma_result = tfma.load_eval_result(model_evaluation_artifacts[0].uri)
  tfma.view.render_slicing_metrics(tfma_result)

Добавляет компонент Pusher в конвейер.

Если модель выглядит многообещающе, нам нужно опубликовать модель. Компонент Pusher может публиковать модель в файловой системе или в моделях платформы GCP AI с помощью специального исполнителя .

Компонент Evaluator постоянно оценивает каждую построенную модель из Trainer , а Pusher копирует модель в предопределенное место в файловой системе или даже в Google Cloud AI Platform Models .

  1. В local_runner.py установите SERVING_MODEL_DIR в каталог для публикации.
  2. В файле pipeline/pipeline.py раскомментируйте # components.append(pusher) , чтобы добавить Pusher в конвейер.

Вы можете обновить конвейер и запустить его снова.

# Update and run the pipeline.
!tfx pipeline update --engine=local --pipeline_path=local_runner.py \
 && tfx run create --engine=local --pipeline_name={PIPELINE_NAME}
CLI
Updating pipeline
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
Pipeline "my_pipeline" updated successfully.
CLI
Creating a run for pipeline: my_pipeline
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Using deployment config:
 executor_specs {
  key: "CsvExampleGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor"
      }
    }
  }
}
executor_specs {
  key: "SchemaGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.schema_gen.executor.Executor"
    }
  }
}
executor_specs {
  key: "StatisticsGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.statistics_gen.executor.Executor"
      }
    }
  }
}
custom_driver_specs {
  key: "CsvExampleGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.example_gen.driver.FileBasedDriver"
    }
  }
}
metadata_connection_config {
  database_connection_config {
    sqlite {
      filename_uri: "./tfx_metadata/my_pipeline/metadata.db"
      connection_mode: READWRITE_OPENCREATE
    }
  }
}

INFO:absl:Using connection config:
 sqlite {
  filename_uri: "./tfx_metadata/my_pipeline/metadata.db"
  connection_mode: READWRITE_OPENCREATE
}

INFO:absl:Component CsvExampleGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
  }
  id: "CsvExampleGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2022-02-03T11:10:48.556314"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.CsvExampleGen"
      }
    }
  }
}
outputs {
  outputs {
    key: "examples"
    value {
      artifact_spec {
        type {
          name: "Examples"
          properties {
            key: "span"
            value: INT
          }
          properties {
            key: "split_names"
            value: STRING
          }
          properties {
            key: "version"
            value: INT
          }
          base_type: DATASET
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "input_base"
    value {
      field_value {
        string_value: "/home/kbuilder/imported/my_pipeline/data"
      }
    }
  }
  parameters {
    key: "input_config"
    value {
      field_value {
        string_value: "{\n  \"splits\": [\n    {\n      \"name\": \"single_split\",\n      \"pattern\": \"*\"\n    }\n  ]\n}"
      }
    }
  }
  parameters {
    key: "output_config"
    value {
      field_value {
        string_value: "{\n  \"split_config\": {\n    \"splits\": [\n      {\n        \"hash_buckets\": 2,\n        \"name\": \"train\"\n      },\n      {\n        \"hash_buckets\": 1,\n        \"name\": \"eval\"\n      }\n    ]\n  }\n}"
      }
    }
  }
  parameters {
    key: "output_data_format"
    value {
      field_value {
        int_value: 6
      }
    }
  }
  parameters {
    key: "output_file_format"
    value {
      field_value {
        int_value: 5
      }
    }
  }
}
downstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
  }
}

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:select span and version = (0, None)
INFO:absl:latest span and version = (0, None)
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 13
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=13, input_dict={}, output_dict=defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/13"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522"
  }
}
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:48.556314:CsvExampleGen:examples:0"
  }
}
custom_properties {
  key: "span"
  value {
    int_value: 0
  }
}
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
properties {
  key: "version"
  value: INT
}
base_type: DATASET
)]}), exec_properties={'input_config': '{\n  "splits": [\n    {\n      "name": "single_split",\n      "pattern": "*"\n    }\n  ]\n}', 'output_config': '{\n  "split_config": {\n    "splits": [\n      {\n        "hash_buckets": 2,\n        "name": "train"\n      },\n      {\n        "hash_buckets": 1,\n        "name": "eval"\n      }\n    ]\n  }\n}', 'output_file_format': 5, 'output_data_format': 6, 'input_base': '/home/kbuilder/imported/my_pipeline/data', 'span': 0, 'version': None, 'input_fingerprint': 'split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/13/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/stateful_working_dir/2022-02-03T11:10:48.556314', tmp_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/13/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
  }
  id: "CsvExampleGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2022-02-03T11:10:48.556314"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.CsvExampleGen"
      }
    }
  }
}
outputs {
  outputs {
    key: "examples"
    value {
      artifact_spec {
        type {
          name: "Examples"
          properties {
            key: "span"
            value: INT
          }
          properties {
            key: "split_names"
            value: STRING
          }
          properties {
            key: "version"
            value: INT
          }
          base_type: DATASET
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "input_base"
    value {
      field_value {
        string_value: "/home/kbuilder/imported/my_pipeline/data"
      }
    }
  }
  parameters {
    key: "input_config"
    value {
      field_value {
        string_value: "{\n  \"splits\": [\n    {\n      \"name\": \"single_split\",\n      \"pattern\": \"*\"\n    }\n  ]\n}"
      }
    }
  }
  parameters {
    key: "output_config"
    value {
      field_value {
        string_value: "{\n  \"split_config\": {\n    \"splits\": [\n      {\n        \"hash_buckets\": 2,\n        \"name\": \"train\"\n      },\n      {\n        \"hash_buckets\": 1,\n        \"name\": \"eval\"\n      }\n    ]\n  }\n}"
      }
    }
  }
  parameters {
    key: "output_data_format"
    value {
      field_value {
        int_value: 6
      }
    }
  }
  parameters {
    key: "output_file_format"
    value {
      field_value {
        int_value: 5
      }
    }
  }
}
downstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
  }
}
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2022-02-03T11:10:48.556314')
INFO:absl:Generating examples.
INFO:absl:Processing input csv data /home/kbuilder/imported/my_pipeline/data/* to TFExample.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
E0203 11:10:49.163841363    5734 fork_posix.cc:70]           Fork support is only compatible with the epoll1 and poll polling strategies
WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be.
INFO:absl:Examples generated.
INFO:absl:Value type <class 'NoneType'> of key version in exec_properties is not supported, going to drop it
INFO:absl:Value type <class 'list'> of key _beam_pipeline_args in exec_properties is not supported, going to drop it
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 13 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/13"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522"
  }
}
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:48.556314:CsvExampleGen:examples:0"
  }
}
custom_properties {
  key: "span"
  value {
    int_value: 0
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
  }
}
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
properties {
  key: "version"
  value: INT
}
base_type: DATASET
)]}) for execution 13
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component CsvExampleGen is finished.
INFO:absl:Component StatisticsGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.statistics_gen.component.StatisticsGen"
    base_type: PROCESS
  }
  id: "StatisticsGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2022-02-03T11:10:48.556314"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.StatisticsGen"
      }
    }
  }
}
inputs {
  inputs {
    key: "examples"
    value {
      channels {
        producer_node_query {
          id: "CsvExampleGen"
        }
        context_queries {
          type {
            name: "pipeline"
          }
          name {
            field_value {
              string_value: "my_pipeline"
            }
          }
        }
        context_queries {
          type {
            name: "pipeline_run"
          }
          name {
            field_value {
              string_value: "2022-02-03T11:10:48.556314"
            }
          }
        }
        context_queries {
          type {
            name: "node"
          }
          name {
            field_value {
              string_value: "my_pipeline.CsvExampleGen"
            }
          }
        }
        artifact_query {
          type {
            name: "Examples"
            base_type: DATASET
          }
        }
        output_key: "examples"
      }
      min_count: 1
    }
  }
}
outputs {
  outputs {
    key: "statistics"
    value {
      artifact_spec {
        type {
          name: "ExampleStatistics"
          properties {
            key: "span"
            value: INT
          }
          properties {
            key: "split_names"
            value: STRING
          }
          base_type: STATISTICS
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
      }
    }
  }
}
upstream_nodes: "CsvExampleGen"
downstream_nodes: "SchemaGen"
execution_options {
  caching_options {
  }
}

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 14
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=14, input_dict={'examples': [Artifact(artifact: id: 13
type_id: 15
uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/13"
properties {
  key: "split_names"
  value {
    string_value: "[\"train\", \"eval\"]"
  }
}
custom_properties {
  key: "file_format"
  value {
    string_value: "tfrecords_gzip"
  }
}
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522"
  }
}
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:48.556314:CsvExampleGen:examples:0"
  }
}
custom_properties {
  key: "payload_format"
  value {
    string_value: "FORMAT_TF_EXAMPLE"
  }
}
custom_properties {
  key: "span"
  value {
    int_value: 0
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
  }
}
state: LIVE
create_time_since_epoch: 1643886649739
last_update_time_since_epoch: 1643886649739
, artifact_type: id: 15
name: "Examples"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
properties {
  key: "version"
  value: INT
}
base_type: DATASET
)]}, output_dict=defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/14"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:48.556314:StatisticsGen:statistics:0"
  }
}
, artifact_type: name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
base_type: STATISTICS
)]}), exec_properties={'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/14/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/stateful_working_dir/2022-02-03T11:10:48.556314', tmp_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/14/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.statistics_gen.component.StatisticsGen"
    base_type: PROCESS
  }
  id: "StatisticsGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2022-02-03T11:10:48.556314"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.StatisticsGen"
      }
    }
  }
}
inputs {
  inputs {
    key: "examples"
    value {
      channels {
        producer_node_query {
          id: "CsvExampleGen"
        }
        context_queries {
          type {
            name: "pipeline"
          }
          name {
            field_value {
              string_value: "my_pipeline"
            }
          }
        }
        context_queries {
          type {
            name: "pipeline_run"
          }
          name {
            field_value {
              string_value: "2022-02-03T11:10:48.556314"
            }
          }
        }
        context_queries {
          type {
            name: "node"
          }
          name {
            field_value {
              string_value: "my_pipeline.CsvExampleGen"
            }
          }
        }
        artifact_query {
          type {
            name: "Examples"
            base_type: DATASET
          }
        }
        output_key: "examples"
      }
      min_count: 1
    }
  }
}
outputs {
  outputs {
    key: "statistics"
    value {
      artifact_spec {
        type {
          name: "ExampleStatistics"
          properties {
            key: "span"
            value: INT
          }
          properties {
            key: "split_names"
            value: STRING
          }
          base_type: STATISTICS
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
      }
    }
  }
}
upstream_nodes: "CsvExampleGen"
downstream_nodes: "SchemaGen"
execution_options {
  caching_options {
  }
}
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2022-02-03T11:10:48.556314')
INFO:absl:Generating statistics for split train.
INFO:absl:Statistics for split train written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/14/Split-train.
INFO:absl:Generating statistics for split eval.
INFO:absl:Statistics for split eval written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/14/Split-eval.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 14 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/14"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:48.556314:StatisticsGen:statistics:0"
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
  }
}
, artifact_type: name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
base_type: STATISTICS
)]}) for execution 14
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component StatisticsGen is finished.
INFO:absl:Component SchemaGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.schema_gen.component.SchemaGen"
    base_type: PROCESS
  }
  id: "SchemaGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2022-02-03T11:10:48.556314"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.SchemaGen"
      }
    }
  }
}
inputs {
  inputs {
    key: "statistics"
    value {
      channels {
        producer_node_query {
          id: "StatisticsGen"
        }
        context_queries {
          type {
            name: "pipeline"
          }
          name {
            field_value {
              string_value: "my_pipeline"
            }
          }
        }
        context_queries {
          type {
            name: "pipeline_run"
          }
          name {
            field_value {
              string_value: "2022-02-03T11:10:48.556314"
            }
          }
        }
        context_queries {
          type {
            name: "node"
          }
          name {
            field_value {
              string_value: "my_pipeline.StatisticsGen"
            }
          }
        }
        artifact_query {
          type {
            name: "ExampleStatistics"
            base_type: STATISTICS
          }
        }
        output_key: "statistics"
      }
      min_count: 1
    }
  }
}
outputs {
  outputs {
    key: "schema"
    value {
      artifact_spec {
        type {
          name: "Schema"
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
      }
    }
  }
  parameters {
    key: "infer_feature_shape"
    value {
      field_value {
        int_value: 1
      }
    }
  }
}
upstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
  }
}

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 15
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=15, input_dict={'statistics': [Artifact(artifact: id: 14
type_id: 17
uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/14"
properties {
  key: "split_names"
  value {
    string_value: "[\"train\", \"eval\"]"
  }
}
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:48.556314:StatisticsGen:statistics:0"
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
  }
}
state: LIVE
create_time_since_epoch: 1643886653128
last_update_time_since_epoch: 1643886653128
, artifact_type: id: 17
name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
base_type: STATISTICS
)]}, output_dict=defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/15"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:48.556314:SchemaGen:schema:0"
  }
}
, artifact_type: name: "Schema"
)]}), exec_properties={'infer_feature_shape': 1, 'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/15/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/stateful_working_dir/2022-02-03T11:10:48.556314', tmp_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/15/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.schema_gen.component.SchemaGen"
    base_type: PROCESS
  }
  id: "SchemaGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2022-02-03T11:10:48.556314"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.SchemaGen"
      }
    }
  }
}
inputs {
  inputs {
    key: "statistics"
    value {
      channels {
        producer_node_query {
          id: "StatisticsGen"
        }
        context_queries {
          type {
            name: "pipeline"
          }
          name {
            field_value {
              string_value: "my_pipeline"
            }
          }
        }
        context_queries {
          type {
            name: "pipeline_run"
          }
          name {
            field_value {
              string_value: "2022-02-03T11:10:48.556314"
            }
          }
        }
        context_queries {
          type {
            name: "node"
          }
          name {
            field_value {
              string_value: "my_pipeline.StatisticsGen"
            }
          }
        }
        artifact_query {
          type {
            name: "ExampleStatistics"
            base_type: STATISTICS
          }
        }
        output_key: "statistics"
      }
      min_count: 1
    }
  }
}
outputs {
  outputs {
    key: "schema"
    value {
      artifact_spec {
        type {
          name: "Schema"
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
      }
    }
  }
  parameters {
    key: "infer_feature_shape"
    value {
      field_value {
        int_value: 1
      }
    }
  }
}
upstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
  }
}
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2022-02-03T11:10:48.556314')
INFO:absl:Processing schema from statistics for split train.
INFO:absl:Processing schema from statistics for split eval.
INFO:absl:Schema written to ./tfx_pipeline_output/my_pipeline/SchemaGen/schema/15/schema.pbtxt.
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 15 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/15"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:48.556314:SchemaGen:schema:0"
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
  }
}
, artifact_type: name: "Schema"
)]}) for execution 15
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component SchemaGen is finished.

Вы сможете найти свою новую модель в SERVING_MODEL_DIR .

Шаг 6. (Необязательно) Разверните конвейер в Kubeflow Pipelines на GCP.

Как упоминалось ранее, local_runner.py хорош для целей отладки или разработки, но не является лучшим решением для рабочих нагрузок. На этом этапе мы развернем конвейер в Kubeflow Pipelines в Google Cloud.

Подготовка

Нам нужен пакет kfp python и программа skaffold для развертывания конвейера в кластере Kubeflow Pipelines.

pip install --upgrade -q kfp

# Download skaffold and set it executable.
curl -Lo skaffold https://storage.googleapis.com/skaffold/releases/latest/skaffold-linux-amd64 && chmod +x skaffold

Вам нужно переместить двоичный файл skaffold в то место, где его сможет найти ваша оболочка. Или вы можете указать путь к скаффолду при запуске двоичного файла tfx с --skaffold-cmd .

# Move skaffold binary into your path
mv skaffold /home/jupyter/.local/bin/
mv: cannot move 'skaffold' to '/home/jupyter/.local/bin/': No such file or directory

Вам также понадобится кластер Kubeflow Pipelines для запуска конвейера. Выполните шаги 1 и 2 в руководстве TFX on Cloud AI Platform Pipelines .

Когда ваш кластер будет готов, откройте панель мониторинга конвейеров, нажав Открыть панель мониторинга конвейеров на странице Pipelines облачной консоли Google . URL-адрес этой страницы — ENDPOINT для запроса запуска конвейера. Значением конечной точки является все, что находится в URL-адресе после https://, вплоть до googleusercontent.com включительно. Поместите конечную точку в следующий блок кода.

ENDPOINT='' # Enter your ENDPOINT here.

Чтобы запустить наш код в кластере Kubeflow Pipelines, нам нужно упаковать наш код в образ контейнера. Образ будет создан автоматически при развертывании нашего конвейера, и вам нужно будет только задать имя и реестр контейнеров для вашего образа. В нашем примере мы будем использовать реестр Google Container и назовем его tfx-pipeline .

# Read GCP project id from env.
shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
GOOGLE_CLOUD_PROJECT=shell_output[0]

# Docker image name for the pipeline image.
CUSTOM_TFX_IMAGE='gcr.io/' + GOOGLE_CLOUD_PROJECT + '/tfx-pipeline'

Установить местоположение данных.

Ваши данные должны быть доступны из кластера Kubeflow Pipelines. Если вы использовали данные в своей локальной среде, вам может потребоваться загрузить их в удаленное хранилище, такое как Google Cloud Storage. Например, мы можем загрузить данные пингвина в корзину по умолчанию, которая создается автоматически при развертывании кластера Kubeflow Pipelines, как показано ниже.

gsutil cp data/data.csv gs://{GOOGLE_CLOUD_PROJECT}-kubeflowpipelines-default/tfx-template/data/penguin/
Copying file://data/data.csv [Content-Type=text/csv]...
NotFoundException: 404 The destination bucket gs://tf-benchmark-dashboard-kubeflowpipelines-default does not exist or the write to the destination must be restarted

Обновите расположение данных, хранящихся в DATA_PATH в kubeflow_runner.py .

Если вы используете BigQueryExampleGen, нет необходимости загружать файл данных, но убедитесь, что kubeflow_runner.py использует тот же query и аргумент beam_pipeline_args для функции pipe.create_pipeline pipeline.create_pipeline() .

Разверните конвейер.

Если все готово, вы можете создать конвейер с помощью команды создания конвейера tfx pipeline create .

!tfx pipeline create  \
--engine=kubeflow \
--pipeline-path=kubeflow_runner.py \
--endpoint={ENDPOINT} \
--build-target-image={CUSTOM_TFX_IMAGE}
CLI
[Error] --build-target-image flag was DELETED. You should specify the build target image at the `KubeflowDagRunnerConfig` class instead, and use --build-image flag without argument to build a container image when creating or updating a pipeline.

Теперь запустите выполнение с вновь созданным конвейером, используя команду tfx run create .

tfx run create --engine=kubeflow --pipeline-name={PIPELINE_NAME} --endpoint={ENDPOINT}
CLI
Creating a run for pipeline: my_pipeline
Failed to load kube config.
Traceback (most recent call last):
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connection.py", line 175, in _new_conn
    (self._dns_host, self.port), self.timeout, **extra_kw
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/util/connection.py", line 95, in create_connection
    raise err
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/util/connection.py", line 85, in create_connection
    sock.connect(sa)
ConnectionRefusedError: [Errno 111] Connection refused

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connectionpool.py", line 710, in urlopen
    chunked=chunked,
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connectionpool.py", line 398, in _make_request
    conn.request(method, url, **httplib_request_kw)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connection.py", line 239, in request
    super(HTTPConnection, self).request(method, url, body=body, headers=headers)
  File "/usr/lib/python3.7/http/client.py", line 1256, in request
    self._send_request(method, url, body, headers, encode_chunked)
  File "/usr/lib/python3.7/http/client.py", line 1302, in _send_request
    self.endheaders(body, encode_chunked=encode_chunked)
  File "/usr/lib/python3.7/http/client.py", line 1251, in endheaders
    self._send_output(message_body, encode_chunked=encode_chunked)
  File "/usr/lib/python3.7/http/client.py", line 1030, in _send_output
    self.send(msg)
  File "/usr/lib/python3.7/http/client.py", line 970, in send
    self.connect()
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connection.py", line 205, in connect
    conn = self._new_conn()
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connection.py", line 187, in _new_conn
    self, "Failed to establish a new connection: %s" % e
urllib3.exceptions.NewConnectionError: <urllib3.connection.HTTPConnection object at 0x7ff729e34190>: Failed to establish a new connection: [Errno 111] Connection refused

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/tmpfs/src/tf_docs_env/bin/tfx", line 8, in <module>
    sys.exit(cli_group())
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 829, in __call__
    return self.main(*args, **kwargs)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 782, in main
    rv = self.invoke(ctx)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 1066, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/decorators.py", line 73, in new_func
    return ctx.invoke(f, obj, *args, **kwargs)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tfx/tools/cli/commands/run.py", line 94, in create_run
    handler = handler_factory.create_handler(ctx.flags_dict)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tfx/tools/cli/handler/handler_factory.py", line 93, in create_handler
    return kubeflow_handler.KubeflowHandler(flags_dict)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tfx/tools/cli/handler/kubeflow_handler.py", line 62, in __init__
    namespace=self.flags_dict[labels.NAMESPACE])
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp/_client.py", line 197, in __init__
    if not self._context_setting['namespace'] and self.get_kfp_healthz(
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp/_client.py", line 411, in get_kfp_healthz
    response = self._healthz_api.get_healthz()
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp_server_api/api/healthz_service_api.py", line 63, in get_healthz
    return self.get_healthz_with_http_info(**kwargs)  # noqa: E501
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp_server_api/api/healthz_service_api.py", line 148, in get_healthz_with_http_info
    collection_formats=collection_formats)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp_server_api/api_client.py", line 369, in call_api
    _preload_content, _request_timeout, _host)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp_server_api/api_client.py", line 185, in __call_api
    _request_timeout=_request_timeout)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp_server_api/api_client.py", line 393, in request
    headers=headers)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp_server_api/rest.py", line 234, in GET
    query_params=query_params)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp_server_api/rest.py", line 212, in request
    headers=headers)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/request.py", line 75, in request
    method, url, fields=fields, headers=headers, **urlopen_kw
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/request.py", line 96, in request_encode_url
    return self.urlopen(method, url, **extra_kw)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/poolmanager.py", line 375, in urlopen
    response = conn.urlopen(method, u.request_uri, **kw)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connectionpool.py", line 826, in urlopen
    **response_kw
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connectionpool.py", line 826, in urlopen
    **response_kw
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connectionpool.py", line 826, in urlopen
    **response_kw
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connectionpool.py", line 786, in urlopen
    method, url, error=e, _pool=self, _stacktrace=sys.exc_info()[2]
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/util/retry.py", line 592, in increment
    raise MaxRetryError(_pool, url, error or ResponseError(cause))
urllib3.exceptions.MaxRetryError: HTTPConnectionPool(host='localhost', port=80): Max retries exceeded with url: /apis/v1beta1/healthz (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7ff729e34190>: Failed to establish a new connection: [Errno 111] Connection refused'))

Или вы также можете запустить конвейер на панели инструментов Kubeflow Pipelines. Новый запуск будет указан в разделе « Experiments » на панели инструментов Kubeflow Pipelines. Нажав на эксперимент, вы сможете отслеживать ход выполнения и визуализировать артефакты, созданные во время выполнения.

Если вы заинтересованы в запуске своего конвейера на Kubeflow Pipelines, найдите дополнительные инструкции в учебнике TFX on Cloud AI Platform Pipelines .

Убираться

Чтобы очистить все ресурсы Google Cloud, используемые на этом этапе, вы можете удалить проект Google Cloud, который вы использовали для руководства.

Кроме того, вы можете очистить отдельные ресурсы, посетив каждую консоль: