Utwórz potok TFX dla swoich danych za pomocą szablonu Penguin


Ten dokument zawiera instrukcje tworzenia potoku TensorFlow Extended (TFX) dla własnego zestawu danych przy użyciu szablonu pingwina, który jest dostarczany z pakietem TFX Python. Utworzony potok będzie początkowo korzystał z zestawu danych Palmer Penguins , ale przekształcimy potok dla Twojego zestawu danych.

Warunki wstępne

  • Linux / MacOS
  • Python 3,6-3,8
  • Notatnik Jupytera

Krok 1. Skopiuj predefiniowany szablon do katalogu projektu.

W tym kroku stworzymy działający katalog i pliki projektu potoku, kopiując pliki z szablonu pingwina w TFX. Możesz myśleć o tym jako o rusztowaniu dla twojego projektu potoku TFX.

Zaktualizuj pip

Jeśli pracujemy w Colab, powinniśmy upewnić się, że mamy najnowszą wersję Pip. Systemy lokalne można oczywiście aktualizować osobno.

import sys
if 'google.colab' in sys.modules:
  !pip install --upgrade pip

Zainstaluj wymagany pakiet

Najpierw zainstaluj TFX i TensorFlow Model Analysis (TFMA).

pip install -U tfx tensorflow-model-analysis

Sprawdźmy wersje TFX.

import tensorflow as tf
import tensorflow_model_analysis as tfma
import tfx

print('TF version: {}'.format(tf.__version__))
print('TFMA version: {}'.format(tfma.__version__))
print('TFX version: {}'.format(tfx.__version__))
TF version: 2.7.1
TFMA version: 0.37.0
TFX version: 1.6.0

Jesteśmy gotowi do stworzenia potoku.

Ustaw PROJECT_DIR na odpowiednie miejsce docelowe dla Twojego środowiska. Wartość domyślna to ~/imported/${PIPELINE_NAME} , która jest odpowiednia dla środowiska Notatnika Google Cloud AI Platform .

Możesz nadać swojemu potoku inną nazwę, zmieniając poniższą nazwę PIPELINE_NAME . Będzie to również nazwa katalogu projektu, w którym zostaną umieszczone twoje pliki.

import os
# Set this project directory to your new tfx pipeline project.
PROJECT_DIR=os.path.join(os.path.expanduser("~"), "imported", PIPELINE_NAME)

Skopiuj pliki szablonów.

TFX zawiera szablon penguin z pakietem TFX Python. szablon penguin zawiera wiele instrukcji, jak wprowadzić zestaw danych do potoku, co jest celem tego samouczka.

Polecenie interfejsu wiersza polecenia tfx template copy kopiuje wstępnie zdefiniowane pliki szablonów do katalogu projektu.

# Set `PATH` to include user python binary directory and a directory containing `skaffold`.
%env PATH={PATH}:/home/jupyter/.local/bin

!tfx template copy \
  --pipeline-name={PIPELINE_NAME} \
  --destination-path={PROJECT_DIR} \
env: PATH=/tmpfs/src/tf_docs_env/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/snap/bin:/opt/puppetlabs/bin:/opt/android-studio/current/bin:/usr/local/go/bin:/usr/local/go/packages/bin:/opt/kubernetes/client/bin/:/home/kbuilder/.local/bin:/home/jupyter/.local/bin
Copying penguin pipeline template
kubeflow_runner.py -> /home/kbuilder/imported/my_pipeline/kubeflow_runner.py
configs.py -> /home/kbuilder/imported/my_pipeline/pipeline/configs.py
pipeline.py -> /home/kbuilder/imported/my_pipeline/pipeline/pipeline.py
__init__.py -> /home/kbuilder/imported/my_pipeline/pipeline/__init__.py
model.py -> /home/kbuilder/imported/my_pipeline/models/model.py
features.py -> /home/kbuilder/imported/my_pipeline/models/features.py
features_test.py -> /home/kbuilder/imported/my_pipeline/models/features_test.py
preprocessing_test.py -> /home/kbuilder/imported/my_pipeline/models/preprocessing_test.py
preprocessing.py -> /home/kbuilder/imported/my_pipeline/models/preprocessing.py
model_test.py -> /home/kbuilder/imported/my_pipeline/models/model_test.py
__init__.py -> /home/kbuilder/imported/my_pipeline/models/__init__.py
constants.py -> /home/kbuilder/imported/my_pipeline/models/constants.py
local_runner.py -> /home/kbuilder/imported/my_pipeline/local_runner.py
__init__.py -> /home/kbuilder/imported/my_pipeline/__init__.py

Zmień kontekst katalogu roboczego w tym notatniku na katalog projektu.


Przeglądaj skopiowane pliki źródłowe

Szablon TFX zawiera podstawowe pliki szkieletowe do zbudowania potoku, w tym kod źródłowy Pythona i przykładowe dane. Szablon penguin wykorzystuje ten sam zbiór danych Palmer Penguins i model ML, co przykład Penguin .

Oto krótkie wprowadzenie do każdego z plików Pythona.

  • pipeline — ten katalog zawiera definicję potoku
    • configs.py — definiuje wspólne stałe dla programów uruchamiających potok
    • pipeline.py — definiuje komponenty TFX i potok
  • models — ten katalog zawiera definicje modeli ML
    • features.py , features_test.py — definiuje cechy dla modelu
    • preprocessing.py , preprocessing_test.py — definiuje procedury wstępnego przetwarzania danych
    • constants.py — definiuje stałe modelu
    • model.py , model_test.py — definiuje model ML przy użyciu frameworków ML, takich jak TensorFlow
  • local_runner.py — zdefiniuj runner dla środowiska lokalnego, który używa lokalnego silnika orkiestracji
  • kubeflow_runner.py — zdefiniuj moduł uruchamiający dla silnika orkiestracji Kubeflow Pipelines

Domyślnie szablon zawiera tylko standardowe komponenty TFX. Jeśli potrzebujesz niestandardowych akcji, możesz utworzyć niestandardowe składniki dla swojego potoku. Szczegółowe informacje można znaleźć w przewodniku po komponentach niestandardowych TFX .

Pliki testów jednostkowych.

Możesz zauważyć, że istnieją pliki z _test.py w nazwie. Są to testy jednostkowe potoku i zaleca się dodanie większej liczby testów jednostkowych podczas implementowania własnych potoków. Testy jednostkowe można uruchamiać, podając nazwę modułu plików testowych z flagą -m . Nazwę modułu można zwykle uzyskać, usuwając rozszerzenie .py i zastępując / . . Na przykład:

import sys
!{sys.executable} -m models.features_test
Running tests under Python 3.7.5: /tmpfs/src/tf_docs_env/bin/python
[ RUN      ] FeaturesTest.testLabelKey
INFO:tensorflow:time(__main__.FeaturesTest.testLabelKey): 0.0s
I0203 11:08:46.306882 140258321348416 test_util.py:2309] time(__main__.FeaturesTest.testLabelKey): 0.0s
[       OK ] FeaturesTest.testLabelKey
[ RUN      ] FeaturesTest.test_session
[  SKIPPED ] FeaturesTest.test_session
Ran 2 tests in 0.001s

OK (skipped=1)

Utwórz potok TFX w środowisku lokalnym.

TFX obsługuje kilka silników orkiestracji do uruchamiania potoków. Użyjemy lokalnego silnika orkiestracji. Lokalny aparat aranżacji działa bez żadnych dalszych zależności i jest odpowiedni do programowania i debugowania, ponieważ działa w środowisku lokalnym, a nie zależy od zdalnych klastrów obliczeniowych.

Użyjemy local_runner.py do uruchomienia potoku za pomocą lokalnego programu Orchestrator. Musisz utworzyć potok przed jego uruchomieniem. Możesz utworzyć potok za pomocą polecenia pipeline create .

tfx pipeline create --engine=local --pipeline_path=local_runner.py
Creating pipeline
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
Pipeline "my_pipeline" created successfully.

Polecenie pipeline create rejestruje potok zdefiniowany w local_runner.py bez faktycznego uruchamiania go.

Utworzony potok uruchomisz za pomocą polecenia run create w kolejnych krokach.

Krok 2. Pozyskaj SWOJE dane do potoku.

Początkowy potok pozyskuje zestaw danych pingwina, który jest zawarty w szablonie. Musisz umieścić swoje dane w potoku, a większość potoków TFX zaczyna się od komponentu ExampleGen.

Wybierz przykładową generację

Twoje dane mogą być przechowywane w dowolnym miejscu, do którego masz dostęp, w lokalnym lub rozproszonym systemie plików lub w systemie z możliwością zapytań. TFX zapewnia różne komponenty ExampleGen , aby przenieść dane do potoku TFX. Możesz wybrać jeden z poniższych przykładowych elementów generujących.

Możesz także utworzyć własny ExampleGen, na przykład tfx zawiera niestandardowy ExecampleGen, który używa Presto jako źródła danych. Zobacz przewodnik , aby uzyskać więcej informacji o tym, jak używać i rozwijać niestandardowe executory.

Gdy zdecydujesz, którego przykładowego Gen użyć, będziesz musiał zmodyfikować definicję potoku, aby korzystać z danych.

  1. Zmodyfikuj DATA_PATH w local_runner.py i ustaw ją na lokalizację swoich plików.

    • Jeśli masz pliki w środowisku lokalnym, określ ścieżkę. Jest to najlepsza opcja do tworzenia lub debugowania potoku.
    • Jeśli pliki są przechowywane w GCS, możesz użyć ścieżki zaczynającej się od gs://{bucket_name}/... . Upewnij się, że możesz uzyskać dostęp do GCS ze swojego terminala, na przykład za pomocą gsutil . W razie potrzeby postępuj zgodnie z przewodnikiem autoryzacji w Google Cloud .
    • Jeśli chcesz użyć obiektu ExampleGen opartego na zapytaniu, takiego jak BigQueryExampleGen, potrzebujesz instrukcji Query, aby wybrać dane ze źródła danych. Aby używać Google Cloud BigQuery jako źródła danych, musisz ustawić jeszcze kilka rzeczy.
    • W pipeline/configs.py :
      • Zmień GOOGLE_CLOUD_PROJECT i GCS_BUCKET_NAME na swój projekt GCP i nazwę zasobnika. Wiadro powinno istnieć przed uruchomieniem potoku.
      • Odkomentuj i ustaw zmienną BIG_QUERY_QUERY w swoim zapytaniu .
    • W local_runner.py :
      • Skomentuj argument data_path i usuń komentarz z argumentu query w pipeline.create_pipeline() .
    • W pipeline/pipeline.py :
      • Skomentuj argument data_path i odkomentuj argument query w create_pipeline() .
      • Użyj BigQueryExampleGen zamiast CsvExampleGen.
  2. Zastąp istniejący CsvExampleGen swoją klasą ExampleGen w pipeline/pipeline.py . Każda klasa ExampleGen ma inny podpis. Więcej szczegółów można znaleźć w przewodniku po komponentach ExampleGen . Nie zapomnij zaimportować wymaganych modułów za pomocą instrukcji import w pipeline/pipeline.py .

Początkowy potok składa się z czterech komponentów: ExampleGen , StatisticsGen , SchemaGen i ExampleValidator . Nie musimy nic zmieniać dla StatisticsGen , SchemaGen i ExampleValidator . Uruchommy potok po raz pierwszy.

# Update and run the pipeline.
!tfx pipeline update --engine=local --pipeline_path=local_runner.py \
 && tfx run create --engine=local --pipeline_name={PIPELINE_NAME}
Updating pipeline
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
Pipeline "my_pipeline" updated successfully.
Creating a run for pipeline: my_pipeline
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Using deployment config:
 executor_specs {
  key: "CsvExampleGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor"
executor_specs {
  key: "SchemaGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.schema_gen.executor.Executor"
executor_specs {
  key: "StatisticsGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.statistics_gen.executor.Executor"
custom_driver_specs {
  key: "CsvExampleGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.example_gen.driver.FileBasedDriver"
metadata_connection_config {
  database_connection_config {
    sqlite {
      filename_uri: "./tfx_metadata/my_pipeline/metadata.db"
      connection_mode: READWRITE_OPENCREATE

INFO:absl:Using connection config:
 sqlite {
  filename_uri: "./tfx_metadata/my_pipeline/metadata.db"
  connection_mode: READWRITE_OPENCREATE

INFO:absl:Component CsvExampleGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
  id: "CsvExampleGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:09:12.120566"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.CsvExampleGen"
outputs {
  outputs {
    key: "examples"
    value {
      artifact_spec {
        type {
          name: "Examples"
          properties {
            key: "span"
            value: INT
          properties {
            key: "split_names"
            value: STRING
          properties {
            key: "version"
            value: INT
          base_type: DATASET
parameters {
  parameters {
    key: "input_base"
    value {
      field_value {
        string_value: "/home/kbuilder/imported/my_pipeline/data"
  parameters {
    key: "input_config"
    value {
      field_value {
        string_value: "{\n  \"splits\": [\n    {\n      \"name\": \"single_split\",\n      \"pattern\": \"*\"\n    }\n  ]\n}"
  parameters {
    key: "output_config"
    value {
      field_value {
        string_value: "{\n  \"split_config\": {\n    \"splits\": [\n      {\n        \"hash_buckets\": 2,\n        \"name\": \"train\"\n      },\n      {\n        \"hash_buckets\": 1,\n        \"name\": \"eval\"\n      }\n    ]\n  }\n}"
  parameters {
    key: "output_data_format"
    value {
      field_value {
        int_value: 6
  parameters {
    key: "output_file_format"
    value {
      field_value {
        int_value: 5
downstream_nodes: "StatisticsGen"
execution_options {
  caching_options {

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:select span and version = (0, None)
INFO:absl:latest span and version = (0, None)
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 1
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=1, input_dict={}, output_dict=defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/1"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:12.120566:CsvExampleGen:examples:0"
custom_properties {
  key: "span"
  value {
    int_value: 0
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
properties {
  key: "version"
  value: INT
base_type: DATASET
)]}), exec_properties={'input_config': '{\n  "splits": [\n    {\n      "name": "single_split",\n      "pattern": "*"\n    }\n  ]\n}', 'output_data_format': 6, 'output_file_format': 5, 'output_config': '{\n  "split_config": {\n    "splits": [\n      {\n        "hash_buckets": 2,\n        "name": "train"\n      },\n      {\n        "hash_buckets": 1,\n        "name": "eval"\n      }\n    ]\n  }\n}', 'input_base': '/home/kbuilder/imported/my_pipeline/data', 'span': 0, 'version': None, 'input_fingerprint': 'split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/1/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/stateful_working_dir/2022-02-03T11:09:12.120566', tmp_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/1/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
  id: "CsvExampleGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:09:12.120566"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.CsvExampleGen"
outputs {
  outputs {
    key: "examples"
    value {
      artifact_spec {
        type {
          name: "Examples"
          properties {
            key: "span"
            value: INT
          properties {
            key: "split_names"
            value: STRING
          properties {
            key: "version"
            value: INT
          base_type: DATASET
parameters {
  parameters {
    key: "input_base"
    value {
      field_value {
        string_value: "/home/kbuilder/imported/my_pipeline/data"
  parameters {
    key: "input_config"
    value {
      field_value {
        string_value: "{\n  \"splits\": [\n    {\n      \"name\": \"single_split\",\n      \"pattern\": \"*\"\n    }\n  ]\n}"
  parameters {
    key: "output_config"
    value {
      field_value {
        string_value: "{\n  \"split_config\": {\n    \"splits\": [\n      {\n        \"hash_buckets\": 2,\n        \"name\": \"train\"\n      },\n      {\n        \"hash_buckets\": 1,\n        \"name\": \"eval\"\n      }\n    ]\n  }\n}"
  parameters {
    key: "output_data_format"
    value {
      field_value {
        int_value: 6
  parameters {
    key: "output_file_format"
    value {
      field_value {
        int_value: 5
downstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2022-02-03T11:09:12.120566')
INFO:absl:Generating examples.
INFO:absl:Processing input csv data /home/kbuilder/imported/my_pipeline/data/* to TFExample.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
E0203 11:09:12.848598153    5127 fork_posix.cc:70]           Fork support is only compatible with the epoll1 and poll polling strategies
WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be.
INFO:absl:Examples generated.
INFO:absl:Value type <class 'NoneType'> of key version in exec_properties is not supported, going to drop it
INFO:absl:Value type <class 'list'> of key _beam_pipeline_args in exec_properties is not supported, going to drop it
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 1 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/1"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:12.120566:CsvExampleGen:examples:0"
custom_properties {
  key: "span"
  value {
    int_value: 0
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
properties {
  key: "version"
  value: INT
base_type: DATASET
)]}) for execution 1
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component CsvExampleGen is finished.
INFO:absl:Component StatisticsGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.statistics_gen.component.StatisticsGen"
    base_type: PROCESS
  id: "StatisticsGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:09:12.120566"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.StatisticsGen"
inputs {
  inputs {
    key: "examples"
    value {
      channels {
        producer_node_query {
          id: "CsvExampleGen"
        context_queries {
          type {
            name: "pipeline"
          name {
            field_value {
              string_value: "my_pipeline"
        context_queries {
          type {
            name: "pipeline_run"
          name {
            field_value {
              string_value: "2022-02-03T11:09:12.120566"
        context_queries {
          type {
            name: "node"
          name {
            field_value {
              string_value: "my_pipeline.CsvExampleGen"
        artifact_query {
          type {
            name: "Examples"
            base_type: DATASET
        output_key: "examples"
      min_count: 1
outputs {
  outputs {
    key: "statistics"
    value {
      artifact_spec {
        type {
          name: "ExampleStatistics"
          properties {
            key: "span"
            value: INT
          properties {
            key: "split_names"
            value: STRING
          base_type: STATISTICS
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
upstream_nodes: "CsvExampleGen"
downstream_nodes: "SchemaGen"
execution_options {
  caching_options {

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 2
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=2, input_dict={'examples': [Artifact(artifact: id: 1
type_id: 15
uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/1"
properties {
  key: "split_names"
  value {
    string_value: "[\"train\", \"eval\"]"
custom_properties {
  key: "file_format"
  value {
    string_value: "tfrecords_gzip"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:12.120566:CsvExampleGen:examples:0"
custom_properties {
  key: "payload_format"
  value {
    string_value: "FORMAT_TF_EXAMPLE"
custom_properties {
  key: "span"
  value {
    int_value: 0
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
state: LIVE
create_time_since_epoch: 1643886553302
last_update_time_since_epoch: 1643886553302
, artifact_type: id: 15
name: "Examples"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
properties {
  key: "version"
  value: INT
base_type: DATASET
)]}, output_dict=defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/2"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:12.120566:StatisticsGen:statistics:0"
, artifact_type: name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
base_type: STATISTICS
)]}), exec_properties={'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/2/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/stateful_working_dir/2022-02-03T11:09:12.120566', tmp_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/2/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.statistics_gen.component.StatisticsGen"
    base_type: PROCESS
  id: "StatisticsGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:09:12.120566"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.StatisticsGen"
inputs {
  inputs {
    key: "examples"
    value {
      channels {
        producer_node_query {
          id: "CsvExampleGen"
        context_queries {
          type {
            name: "pipeline"
          name {
            field_value {
              string_value: "my_pipeline"
        context_queries {
          type {
            name: "pipeline_run"
          name {
            field_value {
              string_value: "2022-02-03T11:09:12.120566"
        context_queries {
          type {
            name: "node"
          name {
            field_value {
              string_value: "my_pipeline.CsvExampleGen"
        artifact_query {
          type {
            name: "Examples"
            base_type: DATASET
        output_key: "examples"
      min_count: 1
outputs {
  outputs {
    key: "statistics"
    value {
      artifact_spec {
        type {
          name: "ExampleStatistics"
          properties {
            key: "span"
            value: INT
          properties {
            key: "split_names"
            value: STRING
          base_type: STATISTICS
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
upstream_nodes: "CsvExampleGen"
downstream_nodes: "SchemaGen"
execution_options {
  caching_options {
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2022-02-03T11:09:12.120566')
INFO:absl:Generating statistics for split train.
INFO:absl:Statistics for split train written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/2/Split-train.
INFO:absl:Generating statistics for split eval.
INFO:absl:Statistics for split eval written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/2/Split-eval.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 2 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/2"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:12.120566:StatisticsGen:statistics:0"
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
, artifact_type: name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
base_type: STATISTICS
)]}) for execution 2
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component StatisticsGen is finished.
INFO:absl:Component SchemaGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.schema_gen.component.SchemaGen"
    base_type: PROCESS
  id: "SchemaGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:09:12.120566"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.SchemaGen"
inputs {
  inputs {
    key: "statistics"
    value {
      channels {
        producer_node_query {
          id: "StatisticsGen"
        context_queries {
          type {
            name: "pipeline"
          name {
            field_value {
              string_value: "my_pipeline"
        context_queries {
          type {
            name: "pipeline_run"
          name {
            field_value {
              string_value: "2022-02-03T11:09:12.120566"
        context_queries {
          type {
            name: "node"
          name {
            field_value {
              string_value: "my_pipeline.StatisticsGen"
        artifact_query {
          type {
            name: "ExampleStatistics"
            base_type: STATISTICS
        output_key: "statistics"
      min_count: 1
outputs {
  outputs {
    key: "schema"
    value {
      artifact_spec {
        type {
          name: "Schema"
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
  parameters {
    key: "infer_feature_shape"
    value {
      field_value {
        int_value: 1
upstream_nodes: "StatisticsGen"
execution_options {
  caching_options {

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 3
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=3, input_dict={'statistics': [Artifact(artifact: id: 2
type_id: 17
uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/2"
properties {
  key: "split_names"
  value {
    string_value: "[\"train\", \"eval\"]"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:12.120566:StatisticsGen:statistics:0"
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
state: LIVE
create_time_since_epoch: 1643886556588
last_update_time_since_epoch: 1643886556588
, artifact_type: id: 17
name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
base_type: STATISTICS
)]}, output_dict=defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/3"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:12.120566:SchemaGen:schema:0"
, artifact_type: name: "Schema"
)]}), exec_properties={'infer_feature_shape': 1, 'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/3/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/stateful_working_dir/2022-02-03T11:09:12.120566', tmp_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/3/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.schema_gen.component.SchemaGen"
    base_type: PROCESS
  id: "SchemaGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:09:12.120566"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.SchemaGen"
inputs {
  inputs {
    key: "statistics"
    value {
      channels {
        producer_node_query {
          id: "StatisticsGen"
        context_queries {
          type {
            name: "pipeline"
          name {
            field_value {
              string_value: "my_pipeline"
        context_queries {
          type {
            name: "pipeline_run"
          name {
            field_value {
              string_value: "2022-02-03T11:09:12.120566"
        context_queries {
          type {
            name: "node"
          name {
            field_value {
              string_value: "my_pipeline.StatisticsGen"
        artifact_query {
          type {
            name: "ExampleStatistics"
            base_type: STATISTICS
        output_key: "statistics"
      min_count: 1
outputs {
  outputs {
    key: "schema"
    value {
      artifact_spec {
        type {
          name: "Schema"
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
  parameters {
    key: "infer_feature_shape"
    value {
      field_value {
        int_value: 1
upstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2022-02-03T11:09:12.120566')
INFO:absl:Processing schema from statistics for split train.
INFO:absl:Processing schema from statistics for split eval.
INFO:absl:Schema written to ./tfx_pipeline_output/my_pipeline/SchemaGen/schema/3/schema.pbtxt.
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 3 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/3"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:12.120566:SchemaGen:schema:0"
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
, artifact_type: name: "Schema"
)]}) for execution 3
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component SchemaGen is finished.

Powinieneś zobaczyć komunikat „Component ExampleValidator” został zakończony. jeśli potok został pomyślnie uruchomiony.

Zbadaj wydajność rurociągu.

Potok TFX generuje dwa rodzaje danych wyjściowych, artefakty i bazę danych metadanych (MLMD) , która zawiera metadane artefaktów i wykonań potoku. Lokalizacja wyjścia jest zdefiniowana w local_runner.py . Domyślnie artefakty są przechowywane w katalogu tfx_pipeline_output , a metadane są przechowywane jako baza danych sqlite w katalogu tfx_metadata .

Możesz użyć interfejsów API MLMD do zbadania tych danych wyjściowych. Najpierw zdefiniujemy kilka funkcji narzędziowych do wyszukiwania artefaktów wyjściowych, które właśnie zostały utworzone.

import tensorflow as tf
import tfx
from ml_metadata import errors
from ml_metadata.proto import metadata_store_pb2
from tfx.types import artifact_utils

# TODO(b/171447278): Move these functions into TFX library.

def get_latest_executions(store, pipeline_name, component_id = None):
  """Fetch all pipeline runs."""
  if component_id is None:  # Find entire pipeline runs.
    run_contexts = [
        c for c in store.get_contexts_by_type('run')
        if c.properties['pipeline_name'].string_value == pipeline_name
  else:  # Find specific component runs.
    run_contexts = [
        c for c in store.get_contexts_by_type('component_run')
        if c.properties['pipeline_name'].string_value == pipeline_name and
           c.properties['component_id'].string_value == component_id
  if not run_contexts:
    return []
  # Pick the latest run context.
  latest_context = max(run_contexts,
                       key=lambda c: c.last_update_time_since_epoch)
  return store.get_executions_by_context(latest_context.id)

def get_latest_artifacts(store, pipeline_name, component_id = None):
  """Fetch all artifacts from latest pipeline execution."""
  executions = get_latest_executions(store, pipeline_name, component_id)

  # Fetch all artifacts produced from the given executions.
  execution_ids = [e.id for e in executions]
  events = store.get_events_by_execution_ids(execution_ids)
  artifact_ids = [
      event.artifact_id for event in events
      if event.type == metadata_store_pb2.Event.OUTPUT
  return store.get_artifacts_by_id(artifact_ids)

def find_latest_artifacts_by_type(store, artifacts, artifact_type):
  """Get the latest artifacts of a specified type."""
  # Get type information from MLMD
    artifact_type = store.get_artifact_type(artifact_type)
  except errors.NotFoundError:
    return []
  # Filter artifacts with type.
  filtered_artifacts = [aritfact for aritfact in artifacts
                        if aritfact.type_id == artifact_type.id]
  # Convert MLMD artifact data into TFX Artifact instances.
  return [artifact_utils.deserialize_artifact(artifact_type, artifact)
      for artifact in filtered_artifacts]

from tfx.orchestration.experimental.interactive import visualizations

def visualize_artifacts(artifacts):
  """Visualizes artifacts using standard visualization modules."""
  for artifact in artifacts:
    visualization = visualizations.get_registry().get_visualization(
    if visualization:

from tfx.orchestration.experimental.interactive import standard_visualizations

import pprint

from tfx.orchestration import metadata
from tfx.types import artifact_utils
from tfx.types import standard_artifacts

def preview_examples(artifacts):
  """Preview a few records from Examples artifacts."""
  pp = pprint.PrettyPrinter()
  for artifact in artifacts:
    print("==== Examples artifact:{}({})".format(artifact.name, artifact.uri))
    for split in artifact_utils.decode_split_names(artifact.split_names):
      print("==== Reading from split:{}".format(split))
      split_uri = artifact_utils.get_split_uri([artifact], split)

      # Get the list of files in this directory (all compressed TFRecord files)
      tfrecord_filenames = [os.path.join(split_uri, name)
                            for name in os.listdir(split_uri)]
      # Create a `TFRecordDataset` to read these files
      dataset = tf.data.TFRecordDataset(tfrecord_filenames,
      # Iterate over the first 2 records and decode them.
      for tfrecord in dataset.take(2):
        serialized_example = tfrecord.numpy()
        example = tf.train.Example()

import local_runner

metadata_connection_config = metadata.sqlite_metadata_connection_config(

Teraz możemy odczytać metadane artefaktów wyjściowych z MLMD.

with metadata.Metadata(metadata_connection_config) as metadata_handler:
    # Search all aritfacts from the previous pipeline run.
    artifacts = get_latest_artifacts(metadata_handler.store, PIPELINE_NAME)
    # Find artifacts of Examples type.
    examples_artifacts = find_latest_artifacts_by_type(
        metadata_handler.store, artifacts,
    # Find artifacts generated from StatisticsGen.
    stats_artifacts = find_latest_artifacts_by_type(
        metadata_handler.store, artifacts,
    # Find artifacts generated from SchemaGen.
    schema_artifacts = find_latest_artifacts_by_type(
        metadata_handler.store, artifacts,
    # Find artifacts generated from ExampleValidator.
    anomalies_artifacts = find_latest_artifacts_by_type(
        metadata_handler.store, artifacts,

Teraz możemy zbadać dane wyjściowe z każdego komponentu. Walidacja danych Tensorflow (TFDV) jest używana w StatisticsGen , SchemaGen i ExampleValidator , a TFDV może służyć do wizualizacji danych wyjściowych z tych komponentów.

W tym samouczku użyjemy metod pomocniczych wizualizacji w TFX, które wewnętrznie używają TFDV do pokazania wizualizacji. Proszę zapoznać się z samouczkiem dotyczącym komponentów TFX, aby dowiedzieć się więcej o każdym komponencie.

Sprawdź formularz wyjściowy PrzykładGen

Przeanalizujmy dane wyjściowe z ExampleGen. Spójrz na pierwsze dwa przykłady dla każdego podziału:


Domyślnie TFX ExampleGen dzieli przykłady na dwa podziały, train i eval , ale możesz dostosować konfigurację podziału .

Sprawdź dane wyjściowe z StatisticsGen


Statystyki te są dostarczane do SchemaGen w celu automatycznego skonstruowania schematu danych.

Sprawdź dane wyjściowe ze SchemaGen


Ten schemat jest automatycznie wywnioskowany z danych wyjściowych StatisticsGen. W tym samouczku użyjemy tego wygenerowanego schematu, ale możesz również modyfikować i dostosowywać schemat .

Sprawdź dane wyjściowe z ExampleValidator


Jeśli zostaną znalezione jakiekolwiek anomalie, możesz przejrzeć swoje dane, aby wszystkie przykłady były zgodne z Twoimi założeniami. Przydatne mogą być dane wyjściowe z innych komponentów, takich jak StatistcsGen. Znalezione anomalie nie blokują wykonywania potoku.

Możesz zobaczyć dostępne funkcje z danych wyjściowych SchemaGen . Jeśli Twoje funkcje mogą być użyte do skonstruowania modelu ML bezpośrednio w Trainer , możesz pominąć następny krok i przejść do kroku 4. W przeciwnym razie możesz wykonać pewne prace inżynieryjne w następnym kroku. Składnik Transform jest potrzebny, gdy wymagane są operacje pełnego przebiegu, takie jak obliczanie średnich, zwłaszcza gdy trzeba skalować.

Krok 3. (Opcjonalnie) Inżynieria funkcji z komponentem Transform.

W tym kroku zdefiniujesz różne zadania inżynierii funkcji, które będą używane przez komponent Transform w potoku. Więcej informacji można znaleźć w przewodniku po komponentach Transformacja .

Jest to konieczne tylko wtedy, gdy kod treningowy wymaga dodatkowych funkcji, które nie są dostępne w danych wyjściowych ExampleGen. W przeciwnym razie możesz szybko przejść do następnego kroku korzystania z Trainera.

Określ cechy modelu

models/features.py zawiera stałe do definiowania cech modelu, w tym nazwy cech, rozmiar słownictwa i tak dalej. Domyślnie szablon penguin ma dwa kosztanty, FEATURE_KEYS i LABEL_KEY , ponieważ nasz model penguin rozwiązuje problem klasyfikacji przy użyciu nadzorowanego uczenia się, a wszystkie funkcje są ciągłymi cechami numerycznymi. Zobacz definicje funkcji z przykładu chicago taxi na inny przykład.

Zaimplementuj preprocessing do trenowania/serwowania w preprocessing_fn().

Rzeczywista inżynieria funkcji ma miejsce w funkcji preprocessing_fn() w models/preprocessing.py .

W preprocessing_fn możesz zdefiniować szereg funkcji, które manipulują wejściowym dyktem tensorów, aby wytworzyć dyktat wyjściowy tensorów. Istnieją funkcje pomocnicze, takie jak scale_to_0_1 i compute_and_apply_vocabulary w API TensorFlow Transform lub możesz po prostu użyć zwykłych funkcji TensorFlow. Domyślnie szablon penguin zawiera przykładowe zastosowania funkcji tft.scale_to_z_score do normalizacji wartości funkcji.

Więcej informacji na temat tworzenia preprocessing_fn można znaleźć w przewodniku po transformacji Tensflow .

Dodaj składnik Transform do potoku.

Jeśli preprocessing_fn jest gotowy, dodaj składnik Transform do potoku.

  1. W pliku pipeline/pipeline.py usuń komentarz # components.append(transform) , aby dodać komponent do potoku.

Możesz zaktualizować potok i uruchomić go ponownie.

!tfx pipeline update --engine=local --pipeline_path=local_runner.py \
 && tfx run create --engine=local --pipeline_name={PIPELINE_NAME}
Updating pipeline
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
Pipeline "my_pipeline" updated successfully.
Creating a run for pipeline: my_pipeline
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Using deployment config:
 executor_specs {
  key: "CsvExampleGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor"
executor_specs {
  key: "SchemaGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.schema_gen.executor.Executor"
executor_specs {
  key: "StatisticsGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.statistics_gen.executor.Executor"
custom_driver_specs {
  key: "CsvExampleGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.example_gen.driver.FileBasedDriver"
metadata_connection_config {
  database_connection_config {
    sqlite {
      filename_uri: "./tfx_metadata/my_pipeline/metadata.db"
      connection_mode: READWRITE_OPENCREATE

INFO:absl:Using connection config:
 sqlite {
  filename_uri: "./tfx_metadata/my_pipeline/metadata.db"
  connection_mode: READWRITE_OPENCREATE

INFO:absl:Component CsvExampleGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
  id: "CsvExampleGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:09:37.055994"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.CsvExampleGen"
outputs {
  outputs {
    key: "examples"
    value {
      artifact_spec {
        type {
          name: "Examples"
          properties {
            key: "span"
            value: INT
          properties {
            key: "split_names"
            value: STRING
          properties {
            key: "version"
            value: INT
          base_type: DATASET
parameters {
  parameters {
    key: "input_base"
    value {
      field_value {
        string_value: "/home/kbuilder/imported/my_pipeline/data"
  parameters {
    key: "input_config"
    value {
      field_value {
        string_value: "{\n  \"splits\": [\n    {\n      \"name\": \"single_split\",\n      \"pattern\": \"*\"\n    }\n  ]\n}"
  parameters {
    key: "output_config"
    value {
      field_value {
        string_value: "{\n  \"split_config\": {\n    \"splits\": [\n      {\n        \"hash_buckets\": 2,\n        \"name\": \"train\"\n      },\n      {\n        \"hash_buckets\": 1,\n        \"name\": \"eval\"\n      }\n    ]\n  }\n}"
  parameters {
    key: "output_data_format"
    value {
      field_value {
        int_value: 6
  parameters {
    key: "output_file_format"
    value {
      field_value {
        int_value: 5
downstream_nodes: "StatisticsGen"
execution_options {
  caching_options {

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:select span and version = (0, None)
INFO:absl:latest span and version = (0, None)
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 4
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=4, input_dict={}, output_dict=defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/4"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:37.055994:CsvExampleGen:examples:0"
custom_properties {
  key: "span"
  value {
    int_value: 0
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
properties {
  key: "version"
  value: INT
base_type: DATASET
)]}), exec_properties={'output_file_format': 5, 'input_base': '/home/kbuilder/imported/my_pipeline/data', 'output_config': '{\n  "split_config": {\n    "splits": [\n      {\n        "hash_buckets": 2,\n        "name": "train"\n      },\n      {\n        "hash_buckets": 1,\n        "name": "eval"\n      }\n    ]\n  }\n}', 'output_data_format': 6, 'input_config': '{\n  "splits": [\n    {\n      "name": "single_split",\n      "pattern": "*"\n    }\n  ]\n}', 'span': 0, 'version': None, 'input_fingerprint': 'split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/4/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/stateful_working_dir/2022-02-03T11:09:37.055994', tmp_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/4/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
  id: "CsvExampleGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:09:37.055994"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.CsvExampleGen"
outputs {
  outputs {
    key: "examples"
    value {
      artifact_spec {
        type {
          name: "Examples"
          properties {
            key: "span"
            value: INT
          properties {
            key: "split_names"
            value: STRING
          properties {
            key: "version"
            value: INT
          base_type: DATASET
parameters {
  parameters {
    key: "input_base"
    value {
      field_value {
        string_value: "/home/kbuilder/imported/my_pipeline/data"
  parameters {
    key: "input_config"
    value {
      field_value {
        string_value: "{\n  \"splits\": [\n    {\n      \"name\": \"single_split\",\n      \"pattern\": \"*\"\n    }\n  ]\n}"
  parameters {
    key: "output_config"
    value {
      field_value {
        string_value: "{\n  \"split_config\": {\n    \"splits\": [\n      {\n        \"hash_buckets\": 2,\n        \"name\": \"train\"\n      },\n      {\n        \"hash_buckets\": 1,\n        \"name\": \"eval\"\n      }\n    ]\n  }\n}"
  parameters {
    key: "output_data_format"
    value {
      field_value {
        int_value: 6
  parameters {
    key: "output_file_format"
    value {
      field_value {
        int_value: 5
downstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2022-02-03T11:09:37.055994')
INFO:absl:Generating examples.
INFO:absl:Processing input csv data /home/kbuilder/imported/my_pipeline/data/* to TFExample.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
E0203 11:09:37.596944686    5287 fork_posix.cc:70]           Fork support is only compatible with the epoll1 and poll polling strategies
WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be.
INFO:absl:Examples generated.
INFO:absl:Value type <class 'NoneType'> of key version in exec_properties is not supported, going to drop it
INFO:absl:Value type <class 'list'> of key _beam_pipeline_args in exec_properties is not supported, going to drop it
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 4 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/4"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:37.055994:CsvExampleGen:examples:0"
custom_properties {
  key: "span"
  value {
    int_value: 0
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
properties {
  key: "version"
  value: INT
base_type: DATASET
)]}) for execution 4
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component CsvExampleGen is finished.
INFO:absl:Component StatisticsGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.statistics_gen.component.StatisticsGen"
    base_type: PROCESS
  id: "StatisticsGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:09:37.055994"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.StatisticsGen"
inputs {
  inputs {
    key: "examples"
    value {
      channels {
        producer_node_query {
          id: "CsvExampleGen"
        context_queries {
          type {
            name: "pipeline"
          name {
            field_value {
              string_value: "my_pipeline"
        context_queries {
          type {
            name: "pipeline_run"
          name {
            field_value {
              string_value: "2022-02-03T11:09:37.055994"
        context_queries {
          type {
            name: "node"
          name {
            field_value {
              string_value: "my_pipeline.CsvExampleGen"
        artifact_query {
          type {
            name: "Examples"
            base_type: DATASET
        output_key: "examples"
      min_count: 1
outputs {
  outputs {
    key: "statistics"
    value {
      artifact_spec {
        type {
          name: "ExampleStatistics"
          properties {
            key: "span"
            value: INT
          properties {
            key: "split_names"
            value: STRING
          base_type: STATISTICS
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
upstream_nodes: "CsvExampleGen"
downstream_nodes: "SchemaGen"
execution_options {
  caching_options {

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 5
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=5, input_dict={'examples': [Artifact(artifact: id: 4
type_id: 15
uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/4"
properties {
  key: "split_names"
  value {
    string_value: "[\"train\", \"eval\"]"
custom_properties {
  key: "file_format"
  value {
    string_value: "tfrecords_gzip"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:37.055994:CsvExampleGen:examples:0"
custom_properties {
  key: "payload_format"
  value {
    string_value: "FORMAT_TF_EXAMPLE"
custom_properties {
  key: "span"
  value {
    int_value: 0
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
state: LIVE
create_time_since_epoch: 1643886578210
last_update_time_since_epoch: 1643886578210
, artifact_type: id: 15
name: "Examples"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
properties {
  key: "version"
  value: INT
base_type: DATASET
)]}, output_dict=defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/5"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:37.055994:StatisticsGen:statistics:0"
, artifact_type: name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
base_type: STATISTICS
)]}), exec_properties={'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/5/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/stateful_working_dir/2022-02-03T11:09:37.055994', tmp_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/5/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.statistics_gen.component.StatisticsGen"
    base_type: PROCESS
  id: "StatisticsGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:09:37.055994"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.StatisticsGen"
inputs {
  inputs {
    key: "examples"
    value {
      channels {
        producer_node_query {
          id: "CsvExampleGen"
        context_queries {
          type {
            name: "pipeline"
          name {
            field_value {
              string_value: "my_pipeline"
        context_queries {
          type {
            name: "pipeline_run"
          name {
            field_value {
              string_value: "2022-02-03T11:09:37.055994"
        context_queries {
          type {
            name: "node"
          name {
            field_value {
              string_value: "my_pipeline.CsvExampleGen"
        artifact_query {
          type {
            name: "Examples"
            base_type: DATASET
        output_key: "examples"
      min_count: 1
outputs {
  outputs {
    key: "statistics"
    value {
      artifact_spec {
        type {
          name: "ExampleStatistics"
          properties {
            key: "span"
            value: INT
          properties {
            key: "split_names"
            value: STRING
          base_type: STATISTICS
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
upstream_nodes: "CsvExampleGen"
downstream_nodes: "SchemaGen"
execution_options {
  caching_options {
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2022-02-03T11:09:37.055994')
INFO:absl:Generating statistics for split train.
INFO:absl:Statistics for split train written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/5/Split-train.
INFO:absl:Generating statistics for split eval.
INFO:absl:Statistics for split eval written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/5/Split-eval.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 5 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/5"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:37.055994:StatisticsGen:statistics:0"
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
, artifact_type: name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
base_type: STATISTICS
)]}) for execution 5
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component StatisticsGen is finished.
INFO:absl:Component SchemaGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.schema_gen.component.SchemaGen"
    base_type: PROCESS
  id: "SchemaGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:09:37.055994"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.SchemaGen"
inputs {
  inputs {
    key: "statistics"
    value {
      channels {
        producer_node_query {
          id: "StatisticsGen"
        context_queries {
          type {
            name: "pipeline"
          name {
            field_value {
              string_value: "my_pipeline"
        context_queries {
          type {
            name: "pipeline_run"
          name {
            field_value {
              string_value: "2022-02-03T11:09:37.055994"
        context_queries {
          type {
            name: "node"
          name {
            field_value {
              string_value: "my_pipeline.StatisticsGen"
        artifact_query {
          type {
            name: "ExampleStatistics"
            base_type: STATISTICS
        output_key: "statistics"
      min_count: 1
outputs {
  outputs {
    key: "schema"
    value {
      artifact_spec {
        type {
          name: "Schema"
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
  parameters {
    key: "infer_feature_shape"
    value {
      field_value {
        int_value: 1
upstream_nodes: "StatisticsGen"
execution_options {
  caching_options {

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 6
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=6, input_dict={'statistics': [Artifact(artifact: id: 5
type_id: 17
uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/5"
properties {
  key: "split_names"
  value {
    string_value: "[\"train\", \"eval\"]"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:37.055994:StatisticsGen:statistics:0"
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
state: LIVE
create_time_since_epoch: 1643886581527
last_update_time_since_epoch: 1643886581527
, artifact_type: id: 17
name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
base_type: STATISTICS
)]}, output_dict=defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/6"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:37.055994:SchemaGen:schema:0"
, artifact_type: name: "Schema"
)]}), exec_properties={'exclude_splits': '[]', 'infer_feature_shape': 1}, execution_output_uri='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/6/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/stateful_working_dir/2022-02-03T11:09:37.055994', tmp_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/6/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.schema_gen.component.SchemaGen"
    base_type: PROCESS
  id: "SchemaGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:09:37.055994"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.SchemaGen"
inputs {
  inputs {
    key: "statistics"
    value {
      channels {
        producer_node_query {
          id: "StatisticsGen"
        context_queries {
          type {
            name: "pipeline"
          name {
            field_value {
              string_value: "my_pipeline"
        context_queries {
          type {
            name: "pipeline_run"
          name {
            field_value {
              string_value: "2022-02-03T11:09:37.055994"
        context_queries {
          type {
            name: "node"
          name {
            field_value {
              string_value: "my_pipeline.StatisticsGen"
        artifact_query {
          type {
            name: "ExampleStatistics"
            base_type: STATISTICS
        output_key: "statistics"
      min_count: 1
outputs {
  outputs {
    key: "schema"
    value {
      artifact_spec {
        type {
          name: "Schema"
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
  parameters {
    key: "infer_feature_shape"
    value {
      field_value {
        int_value: 1
upstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2022-02-03T11:09:37.055994')
INFO:absl:Processing schema from statistics for split train.
INFO:absl:Processing schema from statistics for split eval.
INFO:absl:Schema written to ./tfx_pipeline_output/my_pipeline/SchemaGen/schema/6/schema.pbtxt.
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 6 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/6"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:37.055994:SchemaGen:schema:0"
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
, artifact_type: name: "Schema"
)]}) for execution 6
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component SchemaGen is finished.

Jeśli potok został uruchomiony pomyślnie, powinien zostać wyświetlony komunikat „Przekształcanie składników zakończone”. gdzieś w dzienniku. Ponieważ składnik Transform i składnik ExampleValidator nie są od siebie zależne, kolejność wykonywania nie jest stała. To powiedziawszy, zarówno Transform , jak i ExampleValidator mogą być ostatnim składnikiem w wykonaniu potoku.

Sprawdź dane wyjściowe z Transform

Komponent Transform tworzy dwa rodzaje danych wyjściowych, wykres Tensorflow i przekształcone przykłady. Przekształcone przykłady to typ artefaktu Przykłady, który jest również wytwarzany przez ExampleGen, ale ten zawiera zamiast tego przekształcone wartości funkcji.

Możesz je zbadać tak, jak zrobiliśmy to w poprzednim kroku.

with metadata.Metadata(metadata_connection_config) as metadata_handler:
    # Search all aritfacts from the previous run of Transform component.
    artifacts = get_latest_artifacts(metadata_handler.store,
                                     PIPELINE_NAME, "Transform")
    # Find artifacts of Examples type.
    transformed_examples_artifacts = find_latest_artifacts_by_type(
        metadata_handler.store, artifacts,

Krok 4. Trenuj swój model za pomocą komponentu Trainer.

Zbudujemy model ML z wykorzystaniem komponentu Trainer . Więcej informacji można znaleźć w przewodniku po komponentach trenera . Musisz podać swój kod modelu do komponentu Trainer.

Zdefiniuj swój model.

W szablonie pingwina, models.model.run_fn , jest używany jako argument run_fn dla komponentu Trainer . Oznacza to, że run_fn() w models/model.py zostanie wywołana po uruchomieniu komponentu Trainer . Możesz zobaczyć kod do skonstruowania prostego modelu DNN przy użyciu keras API w danym kodzie. Zobacz TensorFlow 2.x w przewodniku TFX , aby uzyskać więcej informacji na temat używania Keras API w TFX.

W tym run_fn należy zbudować model i zapisać go w katalogu wskazywanym przez fn_args.serving_model_dir , który jest określony przez komponent. Możesz użyć innych argumentów w fn_args , które są przekazywane do run_fn . Zobacz powiązane kody, aby uzyskać pełną listę argumentów w fn_args .

Zdefiniuj swoje cechy w models/features.py i używaj ich w razie potrzeby. Jeśli przekształciłeś swoje elementy w kroku 3, powinieneś użyć przekształconych elementów jako danych wejściowych do swojego modelu.

Dodaj składnik Trainer do potoku.

Jeśli twój run_fn jest gotowy, dodaj składnik Trainer do potoku.

  1. W pliku pipeline/pipeline.py usuń komentarz # components.append(trainer) , aby dodać komponent do potoku.

Argumenty dla komponentu trenera mogą zależeć od tego, czy używasz komponentu Transform, czy nie.

  • Jeśli NIE używasz komponentu Transform , nie musisz zmieniać argumentów.
  • Jeśli używasz komponentu Transform , musisz zmienić argumenty podczas tworzenia instancji komponentu Trainer .

    • Zmień argument examples na examples=transform.outputs['transformed_examples'], . Do szkolenia musimy używać przekształconych przykładów.
    • Dodaj argument transform_graph , taki jak transform_graph=transform.outputs['transform_graph'], . Ten wykres zawiera wykres TensorFlow dla operacji transformacji.
    • Po powyższych zmianach kod do tworzenia komponentu Trainera będzie wyglądał następująco.
    # If you use a Transform component.
    trainer = Trainer(

Możesz zaktualizować potok i uruchomić go ponownie.

!tfx pipeline update --engine=local --pipeline_path=local_runner.py \
 && tfx run create --engine=local --pipeline_name={PIPELINE_NAME}
Updating pipeline
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
Pipeline "my_pipeline" updated successfully.
Creating a run for pipeline: my_pipeline
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Using deployment config:
 executor_specs {
  key: "CsvExampleGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor"
executor_specs {
  key: "SchemaGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.schema_gen.executor.Executor"
executor_specs {
  key: "StatisticsGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.statistics_gen.executor.Executor"
custom_driver_specs {
  key: "CsvExampleGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.example_gen.driver.FileBasedDriver"
metadata_connection_config {
  database_connection_config {
    sqlite {
      filename_uri: "./tfx_metadata/my_pipeline/metadata.db"
      connection_mode: READWRITE_OPENCREATE

INFO:absl:Using connection config:
 sqlite {
  filename_uri: "./tfx_metadata/my_pipeline/metadata.db"
  connection_mode: READWRITE_OPENCREATE

INFO:absl:Component CsvExampleGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
  id: "CsvExampleGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:10:00.469382"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.CsvExampleGen"
outputs {
  outputs {
    key: "examples"
    value {
      artifact_spec {
        type {
          name: "Examples"
          properties {
            key: "span"
            value: INT
          properties {
            key: "split_names"
            value: STRING
          properties {
            key: "version"
            value: INT
          base_type: DATASET
parameters {
  parameters {
    key: "input_base"
    value {
      field_value {
        string_value: "/home/kbuilder/imported/my_pipeline/data"
  parameters {
    key: "input_config"
    value {
      field_value {
        string_value: "{\n  \"splits\": [\n    {\n      \"name\": \"single_split\",\n      \"pattern\": \"*\"\n    }\n  ]\n}"
  parameters {
    key: "output_config"
    value {
      field_value {
        string_value: "{\n  \"split_config\": {\n    \"splits\": [\n      {\n        \"hash_buckets\": 2,\n        \"name\": \"train\"\n      },\n      {\n        \"hash_buckets\": 1,\n        \"name\": \"eval\"\n      }\n    ]\n  }\n}"
  parameters {
    key: "output_data_format"
    value {
      field_value {
        int_value: 6
  parameters {
    key: "output_file_format"
    value {
      field_value {
        int_value: 5
downstream_nodes: "StatisticsGen"
execution_options {
  caching_options {

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:select span and version = (0, None)
INFO:absl:latest span and version = (0, None)
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 7
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=7, input_dict={}, output_dict=defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/7"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:00.469382:CsvExampleGen:examples:0"
custom_properties {
  key: "span"
  value {
    int_value: 0
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
properties {
  key: "version"
  value: INT
base_type: DATASET
)]}), exec_properties={'input_base': '/home/kbuilder/imported/my_pipeline/data', 'output_config': '{\n  "split_config": {\n    "splits": [\n      {\n        "hash_buckets": 2,\n        "name": "train"\n      },\n      {\n        "hash_buckets": 1,\n        "name": "eval"\n      }\n    ]\n  }\n}', 'output_data_format': 6, 'output_file_format': 5, 'input_config': '{\n  "splits": [\n    {\n      "name": "single_split",\n      "pattern": "*"\n    }\n  ]\n}', 'span': 0, 'version': None, 'input_fingerprint': 'split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/7/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/stateful_working_dir/2022-02-03T11:10:00.469382', tmp_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/7/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
  id: "CsvExampleGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:10:00.469382"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.CsvExampleGen"
outputs {
  outputs {
    key: "examples"
    value {
      artifact_spec {
        type {
          name: "Examples"
          properties {
            key: "span"
            value: INT
          properties {
            key: "split_names"
            value: STRING
          properties {
            key: "version"
            value: INT
          base_type: DATASET
parameters {
  parameters {
    key: "input_base"
    value {
      field_value {
        string_value: "/home/kbuilder/imported/my_pipeline/data"
  parameters {
    key: "input_config"
    value {
      field_value {
        string_value: "{\n  \"splits\": [\n    {\n      \"name\": \"single_split\",\n      \"pattern\": \"*\"\n    }\n  ]\n}"
  parameters {
    key: "output_config"
    value {
      field_value {
        string_value: "{\n  \"split_config\": {\n    \"splits\": [\n      {\n        \"hash_buckets\": 2,\n        \"name\": \"train\"\n      },\n      {\n        \"hash_buckets\": 1,\n        \"name\": \"eval\"\n      }\n    ]\n  }\n}"
  parameters {
    key: "output_data_format"
    value {
      field_value {
        int_value: 6
  parameters {
    key: "output_file_format"
    value {
      field_value {
        int_value: 5
downstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2022-02-03T11:10:00.469382')
INFO:absl:Generating examples.
INFO:absl:Processing input csv data /home/kbuilder/imported/my_pipeline/data/* to TFExample.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
E0203 11:10:01.173700221    5436 fork_posix.cc:70]           Fork support is only compatible with the epoll1 and poll polling strategies
WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be.
INFO:absl:Examples generated.
INFO:absl:Value type <class 'NoneType'> of key version in exec_properties is not supported, going to drop it
INFO:absl:Value type <class 'list'> of key _beam_pipeline_args in exec_properties is not supported, going to drop it
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 7 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/7"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:00.469382:CsvExampleGen:examples:0"
custom_properties {
  key: "span"
  value {
    int_value: 0
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
properties {
  key: "version"
  value: INT
base_type: DATASET
)]}) for execution 7
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component CsvExampleGen is finished.
INFO:absl:Component StatisticsGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.statistics_gen.component.StatisticsGen"
    base_type: PROCESS
  id: "StatisticsGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:10:00.469382"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.StatisticsGen"
inputs {
  inputs {
    key: "examples"
    value {
      channels {
        producer_node_query {
          id: "CsvExampleGen"
        context_queries {
          type {
            name: "pipeline"
          name {
            field_value {
              string_value: "my_pipeline"
        context_queries {
          type {
            name: "pipeline_run"
          name {
            field_value {
              string_value: "2022-02-03T11:10:00.469382"
        context_queries {
          type {
            name: "node"
          name {
            field_value {
              string_value: "my_pipeline.CsvExampleGen"
        artifact_query {
          type {
            name: "Examples"
            base_type: DATASET
        output_key: "examples"
      min_count: 1
outputs {
  outputs {
    key: "statistics"
    value {
      artifact_spec {
        type {
          name: "ExampleStatistics"
          properties {
            key: "span"
            value: INT
          properties {
            key: "split_names"
            value: STRING
          base_type: STATISTICS
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
upstream_nodes: "CsvExampleGen"
downstream_nodes: "SchemaGen"
execution_options {
  caching_options {

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 8
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=8, input_dict={'examples': [Artifact(artifact: id: 7
type_id: 15
uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/7"
properties {
  key: "split_names"
  value {
    string_value: "[\"train\", \"eval\"]"
custom_properties {
  key: "file_format"
  value {
    string_value: "tfrecords_gzip"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:00.469382:CsvExampleGen:examples:0"
custom_properties {
  key: "payload_format"
  value {
    string_value: "FORMAT_TF_EXAMPLE"
custom_properties {
  key: "span"
  value {
    int_value: 0
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
state: LIVE
create_time_since_epoch: 1643886601629
last_update_time_since_epoch: 1643886601629
, artifact_type: id: 15
name: "Examples"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
properties {
  key: "version"
  value: INT
base_type: DATASET
)]}, output_dict=defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/8"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:00.469382:StatisticsGen:statistics:0"
, artifact_type: name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
base_type: STATISTICS
)]}), exec_properties={'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/8/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/stateful_working_dir/2022-02-03T11:10:00.469382', tmp_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/8/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.statistics_gen.component.StatisticsGen"
    base_type: PROCESS
  id: "StatisticsGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:10:00.469382"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.StatisticsGen"
inputs {
  inputs {
    key: "examples"
    value {
      channels {
        producer_node_query {
          id: "CsvExampleGen"
        context_queries {
          type {
            name: "pipeline"
          name {
            field_value {
              string_value: "my_pipeline"
        context_queries {
          type {
            name: "pipeline_run"
          name {
            field_value {
              string_value: "2022-02-03T11:10:00.469382"
        context_queries {
          type {
            name: "node"
          name {
            field_value {
              string_value: "my_pipeline.CsvExampleGen"
        artifact_query {
          type {
            name: "Examples"
            base_type: DATASET
        output_key: "examples"
      min_count: 1
outputs {
  outputs {
    key: "statistics"
    value {
      artifact_spec {
        type {
          name: "ExampleStatistics"
          properties {
            key: "span"
            value: INT
          properties {
            key: "split_names"
            value: STRING
          base_type: STATISTICS
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
upstream_nodes: "CsvExampleGen"
downstream_nodes: "SchemaGen"
execution_options {
  caching_options {
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2022-02-03T11:10:00.469382')
INFO:absl:Generating statistics for split train.
INFO:absl:Statistics for split train written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/8/Split-train.
INFO:absl:Generating statistics for split eval.
INFO:absl:Statistics for split eval written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/8/Split-eval.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 8 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/8"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:00.469382:StatisticsGen:statistics:0"
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
, artifact_type: name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
base_type: STATISTICS
)]}) for execution 8
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component StatisticsGen is finished.
INFO:absl:Component SchemaGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.schema_gen.component.SchemaGen"
    base_type: PROCESS
  id: "SchemaGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:10:00.469382"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.SchemaGen"
inputs {
  inputs {
    key: "statistics"
    value {
      channels {
        producer_node_query {
          id: "StatisticsGen"
        context_queries {
          type {
            name: "pipeline"
          name {
            field_value {
              string_value: "my_pipeline"
        context_queries {
          type {
            name: "pipeline_run"
          name {
            field_value {
              string_value: "2022-02-03T11:10:00.469382"
        context_queries {
          type {
            name: "node"
          name {
            field_value {
              string_value: "my_pipeline.StatisticsGen"
        artifact_query {
          type {
            name: "ExampleStatistics"
            base_type: STATISTICS
        output_key: "statistics"
      min_count: 1
outputs {
  outputs {
    key: "schema"
    value {
      artifact_spec {
        type {
          name: "Schema"
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
  parameters {
    key: "infer_feature_shape"
    value {
      field_value {
        int_value: 1
upstream_nodes: "StatisticsGen"
execution_options {
  caching_options {

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 9
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=9, input_dict={'statistics': [Artifact(artifact: id: 8
type_id: 17
uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/8"
properties {
  key: "split_names"
  value {
    string_value: "[\"train\", \"eval\"]"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:00.469382:StatisticsGen:statistics:0"
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
state: LIVE
create_time_since_epoch: 1643886605023
last_update_time_since_epoch: 1643886605023
, artifact_type: id: 17
name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
base_type: STATISTICS
)]}, output_dict=defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/9"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:00.469382:SchemaGen:schema:0"
, artifact_type: name: "Schema"
)]}), exec_properties={'exclude_splits': '[]', 'infer_feature_shape': 1}, execution_output_uri='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/9/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/stateful_working_dir/2022-02-03T11:10:00.469382', tmp_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/9/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.schema_gen.component.SchemaGen"
    base_type: PROCESS
  id: "SchemaGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:10:00.469382"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.SchemaGen"
inputs {
  inputs {
    key: "statistics"
    value {
      channels {
        producer_node_query {
          id: "StatisticsGen"
        context_queries {
          type {
            name: "pipeline"
          name {
            field_value {
              string_value: "my_pipeline"
        context_queries {
          type {
            name: "pipeline_run"
          name {
            field_value {
              string_value: "2022-02-03T11:10:00.469382"
        context_queries {
          type {
            name: "node"
          name {
            field_value {
              string_value: "my_pipeline.StatisticsGen"
        artifact_query {
          type {
            name: "ExampleStatistics"
            base_type: STATISTICS
        output_key: "statistics"
      min_count: 1
outputs {
  outputs {
    key: "schema"
    value {
      artifact_spec {
        type {
          name: "Schema"
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
  parameters {
    key: "infer_feature_shape"
    value {
      field_value {
        int_value: 1
upstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2022-02-03T11:10:00.469382')
INFO:absl:Processing schema from statistics for split train.
INFO:absl:Processing schema from statistics for split eval.
INFO:absl:Schema written to ./tfx_pipeline_output/my_pipeline/SchemaGen/schema/9/schema.pbtxt.
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 9 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/9"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:00.469382:SchemaGen:schema:0"
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
, artifact_type: name: "Schema"
)]}) for execution 9
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component SchemaGen is finished.

Po pomyślnym wykonaniu tego wykonania utworzyłeś i uruchomiłeś swój pierwszy potok TFX dla swojego modelu. Gratulacje!

Twój nowy model będzie zlokalizowany w jakimś miejscu w katalogu wyjściowym, ale byłoby lepiej mieć model w stałej lokalizacji lub usłudze poza potokiem TFX, który zawiera wiele wyników pośrednich. Jeszcze lepiej dzięki ciągłej ocenie zbudowanego modelu, co ma kluczowe znaczenie w systemach produkcyjnych ML. Zobaczymy, jak ciągła ocena i wdrożenia działają w TFX w następnym kroku.

Krok 5. (Opcjonalnie) Oceń model za pomocą narzędzia Evaluator i opublikuj za pomocą popychacza.

Komponent Evaluator stale ocenia każdy zbudowany model z programu Trainer , a Pusher kopiuje model do predefiniowanej lokalizacji w systemie plików lub nawet do modeli Google Cloud AI Platform .

Dodaje składnik Evaluator do potoku.

W pliku pipeline/pipeline.py :

  1. # components.append(model_resolver) , aby dodać najnowszy mechanizm rozpoznawania modelu do potoku. Narzędzie Evaluator może służyć do porównania modelu ze starym modelem podstawowym, który przeszedł pomyślnie narzędzie Evaluator w ostatnim przebiegu potoku. LatestBlessedModelResolver wyszukuje najnowszy model, który przeszedł ocenę.
  2. Ustaw odpowiednią tfma.MetricsSpec dla swojego modelu. Ocena może być inna dla każdego modelu ML. W szablonie pingwina SparseCategoricalAccuracy , ponieważ rozwiązujemy problem klasyfikacji wielokategorii. Musisz również określić tfma.SliceSpec , aby przeanalizować model pod kątem określonych wycinków. Aby uzyskać więcej informacji, zobacz Przewodnik po komponentach oceniającego .
  3. Usuń komentarz # components.append(evaluator) , aby dodać komponent do potoku.

Możesz zaktualizować potok i uruchomić go ponownie.

# Update and run the pipeline.
!tfx pipeline update --engine=local --pipeline_path=local_runner.py \
 && tfx run create --engine=local --pipeline_name={PIPELINE_NAME}
Updating pipeline
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
Pipeline "my_pipeline" updated successfully.
Creating a run for pipeline: my_pipeline
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Using deployment config:
 executor_specs {
  key: "CsvExampleGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor"
executor_specs {
  key: "SchemaGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.schema_gen.executor.Executor"
executor_specs {
  key: "StatisticsGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.statistics_gen.executor.Executor"
custom_driver_specs {
  key: "CsvExampleGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.example_gen.driver.FileBasedDriver"
metadata_connection_config {
  database_connection_config {
    sqlite {
      filename_uri: "./tfx_metadata/my_pipeline/metadata.db"
      connection_mode: READWRITE_OPENCREATE

INFO:absl:Using connection config:
 sqlite {
  filename_uri: "./tfx_metadata/my_pipeline/metadata.db"
  connection_mode: READWRITE_OPENCREATE

INFO:absl:Component CsvExampleGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
  id: "CsvExampleGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:10:24.358660"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.CsvExampleGen"
outputs {
  outputs {
    key: "examples"
    value {
      artifact_spec {
        type {
          name: "Examples"
          properties {
            key: "span"
            value: INT
          properties {
            key: "split_names"
            value: STRING
          properties {
            key: "version"
            value: INT
          base_type: DATASET
parameters {
  parameters {
    key: "input_base"
    value {
      field_value {
        string_value: "/home/kbuilder/imported/my_pipeline/data"
  parameters {
    key: "input_config"
    value {
      field_value {
        string_value: "{\n  \"splits\": [\n    {\n      \"name\": \"single_split\",\n      \"pattern\": \"*\"\n    }\n  ]\n}"
  parameters {
    key: "output_config"
    value {
      field_value {
        string_value: "{\n  \"split_config\": {\n    \"splits\": [\n      {\n        \"hash_buckets\": 2,\n        \"name\": \"train\"\n      },\n      {\n        \"hash_buckets\": 1,\n        \"name\": \"eval\"\n      }\n    ]\n  }\n}"
  parameters {
    key: "output_data_format"
    value {
      field_value {
        int_value: 6
  parameters {
    key: "output_file_format"
    value {
      field_value {
        int_value: 5
downstream_nodes: "StatisticsGen"
execution_options {
  caching_options {

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:select span and version = (0, None)
INFO:absl:latest span and version = (0, None)
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 10
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=10, input_dict={}, output_dict=defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/10"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:24.358660:CsvExampleGen:examples:0"
custom_properties {
  key: "span"
  value {
    int_value: 0
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
properties {
  key: "version"
  value: INT
base_type: DATASET
)]}), exec_properties={'output_file_format': 5, 'output_config': '{\n  "split_config": {\n    "splits": [\n      {\n        "hash_buckets": 2,\n        "name": "train"\n      },\n      {\n        "hash_buckets": 1,\n        "name": "eval"\n      }\n    ]\n  }\n}', 'output_data_format': 6, 'input_base': '/home/kbuilder/imported/my_pipeline/data', 'input_config': '{\n  "splits": [\n    {\n      "name": "single_split",\n      "pattern": "*"\n    }\n  ]\n}', 'span': 0, 'version': None, 'input_fingerprint': 'split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/10/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/stateful_working_dir/2022-02-03T11:10:24.358660', tmp_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/10/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
  id: "CsvExampleGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:10:24.358660"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.CsvExampleGen"
outputs {
  outputs {
    key: "examples"
    value {
      artifact_spec {
        type {
          name: "Examples"
          properties {
            key: "span"
            value: INT
          properties {
            key: "split_names"
            value: STRING
          properties {
            key: "version"
            value: INT
          base_type: DATASET
parameters {
  parameters {
    key: "input_base"
    value {
      field_value {
        string_value: "/home/kbuilder/imported/my_pipeline/data"
  parameters {
    key: "input_config"
    value {
      field_value {
        string_value: "{\n  \"splits\": [\n    {\n      \"name\": \"single_split\",\n      \"pattern\": \"*\"\n    }\n  ]\n}"
  parameters {
    key: "output_config"
    value {
      field_value {
        string_value: "{\n  \"split_config\": {\n    \"splits\": [\n      {\n        \"hash_buckets\": 2,\n        \"name\": \"train\"\n      },\n      {\n        \"hash_buckets\": 1,\n        \"name\": \"eval\"\n      }\n    ]\n  }\n}"
  parameters {
    key: "output_data_format"
    value {
      field_value {
        int_value: 6
  parameters {
    key: "output_file_format"
    value {
      field_value {
        int_value: 5
downstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2022-02-03T11:10:24.358660')
INFO:absl:Generating examples.
INFO:absl:Processing input csv data /home/kbuilder/imported/my_pipeline/data/* to TFExample.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
E0203 11:10:24.894390124    5584 fork_posix.cc:70]           Fork support is only compatible with the epoll1 and poll polling strategies
WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be.
INFO:absl:Examples generated.
INFO:absl:Value type <class 'NoneType'> of key version in exec_properties is not supported, going to drop it
INFO:absl:Value type <class 'list'> of key _beam_pipeline_args in exec_properties is not supported, going to drop it
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 10 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/10"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:24.358660:CsvExampleGen:examples:0"
custom_properties {
  key: "span"
  value {
    int_value: 0
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
properties {
  key: "version"
  value: INT
base_type: DATASET
)]}) for execution 10
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component CsvExampleGen is finished.
INFO:absl:Component StatisticsGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.statistics_gen.component.StatisticsGen"
    base_type: PROCESS
  id: "StatisticsGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:10:24.358660"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.StatisticsGen"
inputs {
  inputs {
    key: "examples"
    value {
      channels {
        producer_node_query {
          id: "CsvExampleGen"
        context_queries {
          type {
            name: "pipeline"
          name {
            field_value {
              string_value: "my_pipeline"
        context_queries {
          type {
            name: "pipeline_run"
          name {
            field_value {
              string_value: "2022-02-03T11:10:24.358660"
        context_queries {
          type {
            name: "node"
          name {
            field_value {
              string_value: "my_pipeline.CsvExampleGen"
        artifact_query {
          type {
            name: "Examples"
            base_type: DATASET
        output_key: "examples"
      min_count: 1
outputs {
  outputs {
    key: "statistics"
    value {
      artifact_spec {
        type {
          name: "ExampleStatistics"
          properties {
            key: "span"
            value: INT
          properties {
            key: "split_names"
            value: STRING
          base_type: STATISTICS
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
upstream_nodes: "CsvExampleGen"
downstream_nodes: "SchemaGen"
execution_options {
  caching_options {

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 11
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=11, input_dict={'examples': [Artifact(artifact: id: 10
type_id: 15
uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/10"
properties {
  key: "split_names"
  value {
    string_value: "[\"train\", \"eval\"]"
custom_properties {
  key: "file_format"
  value {
    string_value: "tfrecords_gzip"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:24.358660:CsvExampleGen:examples:0"
custom_properties {
  key: "payload_format"
  value {
    string_value: "FORMAT_TF_EXAMPLE"
custom_properties {
  key: "span"
  value {
    int_value: 0
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
state: LIVE
create_time_since_epoch: 1643886625515
last_update_time_since_epoch: 1643886625515
, artifact_type: id: 15
name: "Examples"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
properties {
  key: "version"
  value: INT
base_type: DATASET
)]}, output_dict=defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/11"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:24.358660:StatisticsGen:statistics:0"
, artifact_type: name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
base_type: STATISTICS
)]}), exec_properties={'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/11/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/stateful_working_dir/2022-02-03T11:10:24.358660', tmp_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/11/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.statistics_gen.component.StatisticsGen"
    base_type: PROCESS
  id: "StatisticsGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:10:24.358660"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.StatisticsGen"
inputs {
  inputs {
    key: "examples"
    value {
      channels {
        producer_node_query {
          id: "CsvExampleGen"
        context_queries {
          type {
            name: "pipeline"
          name {
            field_value {
              string_value: "my_pipeline"
        context_queries {
          type {
            name: "pipeline_run"
          name {
            field_value {
              string_value: "2022-02-03T11:10:24.358660"
        context_queries {
          type {
            name: "node"
          name {
            field_value {
              string_value: "my_pipeline.CsvExampleGen"
        artifact_query {
          type {
            name: "Examples"
            base_type: DATASET
        output_key: "examples"
      min_count: 1
outputs {
  outputs {
    key: "statistics"
    value {
      artifact_spec {
        type {
          name: "ExampleStatistics"
          properties {
            key: "span"
            value: INT
          properties {
            key: "split_names"
            value: STRING
          base_type: STATISTICS
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
upstream_nodes: "CsvExampleGen"
downstream_nodes: "SchemaGen"
execution_options {
  caching_options {
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2022-02-03T11:10:24.358660')
INFO:absl:Generating statistics for split train.
INFO:absl:Statistics for split train written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/11/Split-train.
INFO:absl:Generating statistics for split eval.
INFO:absl:Statistics for split eval written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/11/Split-eval.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 11 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/11"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:24.358660:StatisticsGen:statistics:0"
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
, artifact_type: name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
base_type: STATISTICS
)]}) for execution 11
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component StatisticsGen is finished.
INFO:absl:Component SchemaGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.schema_gen.component.SchemaGen"
    base_type: PROCESS
  id: "SchemaGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:10:24.358660"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.SchemaGen"
inputs {
  inputs {
    key: "statistics"
    value {
      channels {
        producer_node_query {
          id: "StatisticsGen"
        context_queries {
          type {
            name: "pipeline"
          name {
            field_value {
              string_value: "my_pipeline"
        context_queries {
          type {
            name: "pipeline_run"
          name {
            field_value {
              string_value: "2022-02-03T11:10:24.358660"
        context_queries {
          type {
            name: "node"
          name {
            field_value {
              string_value: "my_pipeline.StatisticsGen"
        artifact_query {
          type {
            name: "ExampleStatistics"
            base_type: STATISTICS
        output_key: "statistics"
      min_count: 1
outputs {
  outputs {
    key: "schema"
    value {
      artifact_spec {
        type {
          name: "Schema"
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
  parameters {
    key: "infer_feature_shape"
    value {
      field_value {
        int_value: 1
upstream_nodes: "StatisticsGen"
execution_options {
  caching_options {

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 12
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=12, input_dict={'statistics': [Artifact(artifact: id: 11
type_id: 17
uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/11"
properties {
  key: "split_names"
  value {
    string_value: "[\"train\", \"eval\"]"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:24.358660:StatisticsGen:statistics:0"
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
state: LIVE
create_time_since_epoch: 1643886628941
last_update_time_since_epoch: 1643886628941
, artifact_type: id: 17
name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
base_type: STATISTICS
)]}, output_dict=defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/12"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:24.358660:SchemaGen:schema:0"
, artifact_type: name: "Schema"
)]}), exec_properties={'infer_feature_shape': 1, 'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/12/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/stateful_working_dir/2022-02-03T11:10:24.358660', tmp_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/12/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.schema_gen.component.SchemaGen"
    base_type: PROCESS
  id: "SchemaGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:10:24.358660"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.SchemaGen"
inputs {
  inputs {
    key: "statistics"
    value {
      channels {
        producer_node_query {
          id: "StatisticsGen"
        context_queries {
          type {
            name: "pipeline"
          name {
            field_value {
              string_value: "my_pipeline"
        context_queries {
          type {
            name: "pipeline_run"
          name {
            field_value {
              string_value: "2022-02-03T11:10:24.358660"
        context_queries {
          type {
            name: "node"
          name {
            field_value {
              string_value: "my_pipeline.StatisticsGen"
        artifact_query {
          type {
            name: "ExampleStatistics"
            base_type: STATISTICS
        output_key: "statistics"
      min_count: 1
outputs {
  outputs {
    key: "schema"
    value {
      artifact_spec {
        type {
          name: "Schema"
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
  parameters {
    key: "infer_feature_shape"
    value {
      field_value {
        int_value: 1
upstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2022-02-03T11:10:24.358660')
INFO:absl:Processing schema from statistics for split train.
INFO:absl:Processing schema from statistics for split eval.
INFO:absl:Schema written to ./tfx_pipeline_output/my_pipeline/SchemaGen/schema/12/schema.pbtxt.
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 12 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/12"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:24.358660:SchemaGen:schema:0"
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
, artifact_type: name: "Schema"
)]}) for execution 12
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component SchemaGen is finished.

Zbadaj wyniki Ewaluatora

Ten krok wymaga rozszerzenia notatnika TensorFlow Model Analysis (TFMA) Jupyter. Pamiętaj, że wersja rozszerzenia notebooka TFMA powinna być identyczna z wersją pakietu TFMA Python.

Następujące polecenie zainstaluje rozszerzenie notebooka TFMA z rejestru NPM. Może to potrwać kilka minut.

# Install TFMA notebook extension.
jupyter labextension install tensorflow_model_analysis@{tfma.__version__}
usage: jupyter [-h] [--version] [--config-dir] [--data-dir] [--runtime-dir]
               [--paths] [--json] [--debug]

Jupyter: Interactive Computing

positional arguments:
  subcommand     the subcommand to launch

optional arguments:
  -h, --help     show this help message and exit
  --version      show the versions of core jupyter packages and exit
  --config-dir   show Jupyter config dir
  --data-dir     show Jupyter data dir
  --runtime-dir  show Jupyter runtime dir
  --paths        show all Jupyter paths. Add --json for machine-readable
  --json         output paths as machine-readable json
  --debug        output debug information about paths

Available subcommands: bundlerextension console dejavu execute kernel
kernelspec migrate nbconvert nbextension notebook qtconsole run
serverextension troubleshoot trust

Jupyter command `jupyter-labextension` not found.

Jeśli instalacja została zakończona, załaduj ponownie przeglądarkę , aby rozszerzenie zaczęło działać.

with metadata.Metadata(metadata_connection_config) as metadata_handler:
  # Search all aritfacts from the previous pipeline run.
  artifacts = get_latest_artifacts(metadata_handler.store, PIPELINE_NAME)
  model_evaluation_artifacts = find_latest_artifacts_by_type(
      metadata_handler.store, artifacts,
if model_evaluation_artifacts:
  tfma_result = tfma.load_eval_result(model_evaluation_artifacts[0].uri)

Dodaje składnik Pusher do potoku.

Jeśli model wygląda obiecująco, musimy go opublikować. Komponent Pusher może opublikować model w lokalizacji w systemie plików lub w modelach GCP AI Platform przy użyciu niestandardowego modułu wykonawczego .

Komponent Evaluator stale ocenia każdy zbudowany model z programu Trainer , a Pusher kopiuje model do predefiniowanej lokalizacji w systemie plików lub nawet do modeli Google Cloud AI Platform .

  1. W local_runner.py ustaw SERVING_MODEL_DIR na katalog do opublikowania.
  2. W pliku pipeline/pipeline.py usuń komentarz # components.append(pusher) , aby dodać Pusher do potoku.

Możesz zaktualizować potok i uruchomić go ponownie.

# Update and run the pipeline.
!tfx pipeline update --engine=local --pipeline_path=local_runner.py \
 && tfx run create --engine=local --pipeline_name={PIPELINE_NAME}
Updating pipeline
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
Pipeline "my_pipeline" updated successfully.
Creating a run for pipeline: my_pipeline
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Using deployment config:
 executor_specs {
  key: "CsvExampleGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor"
executor_specs {
  key: "SchemaGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.schema_gen.executor.Executor"
executor_specs {
  key: "StatisticsGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.statistics_gen.executor.Executor"
custom_driver_specs {
  key: "CsvExampleGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.example_gen.driver.FileBasedDriver"
metadata_connection_config {
  database_connection_config {
    sqlite {
      filename_uri: "./tfx_metadata/my_pipeline/metadata.db"
      connection_mode: READWRITE_OPENCREATE

INFO:absl:Using connection config:
 sqlite {
  filename_uri: "./tfx_metadata/my_pipeline/metadata.db"
  connection_mode: READWRITE_OPENCREATE

INFO:absl:Component CsvExampleGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
  id: "CsvExampleGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:10:48.556314"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.CsvExampleGen"
outputs {
  outputs {
    key: "examples"
    value {
      artifact_spec {
        type {
          name: "Examples"
          properties {
            key: "span"
            value: INT
          properties {
            key: "split_names"
            value: STRING
          properties {
            key: "version"
            value: INT
          base_type: DATASET
parameters {
  parameters {
    key: "input_base"
    value {
      field_value {
        string_value: "/home/kbuilder/imported/my_pipeline/data"
  parameters {
    key: "input_config"
    value {
      field_value {
        string_value: "{\n  \"splits\": [\n    {\n      \"name\": \"single_split\",\n      \"pattern\": \"*\"\n    }\n  ]\n}"
  parameters {
    key: "output_config"
    value {
      field_value {
        string_value: "{\n  \"split_config\": {\n    \"splits\": [\n      {\n        \"hash_buckets\": 2,\n        \"name\": \"train\"\n      },\n      {\n        \"hash_buckets\": 1,\n        \"name\": \"eval\"\n      }\n    ]\n  }\n}"
  parameters {
    key: "output_data_format"
    value {
      field_value {
        int_value: 6
  parameters {
    key: "output_file_format"
    value {
      field_value {
        int_value: 5
downstream_nodes: "StatisticsGen"
execution_options {
  caching_options {

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:select span and version = (0, None)
INFO:absl:latest span and version = (0, None)
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 13
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=13, input_dict={}, output_dict=defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/13"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:48.556314:CsvExampleGen:examples:0"
custom_properties {
  key: "span"
  value {
    int_value: 0
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
properties {
  key: "version"
  value: INT
base_type: DATASET
)]}), exec_properties={'input_config': '{\n  "splits": [\n    {\n      "name": "single_split",\n      "pattern": "*"\n    }\n  ]\n}', 'output_config': '{\n  "split_config": {\n    "splits": [\n      {\n        "hash_buckets": 2,\n        "name": "train"\n      },\n      {\n        "hash_buckets": 1,\n        "name": "eval"\n      }\n    ]\n  }\n}', 'output_file_format': 5, 'output_data_format': 6, 'input_base': '/home/kbuilder/imported/my_pipeline/data', 'span': 0, 'version': None, 'input_fingerprint': 'split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/13/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/stateful_working_dir/2022-02-03T11:10:48.556314', tmp_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/13/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
  id: "CsvExampleGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:10:48.556314"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.CsvExampleGen"
outputs {
  outputs {
    key: "examples"
    value {
      artifact_spec {
        type {
          name: "Examples"
          properties {
            key: "span"
            value: INT
          properties {
            key: "split_names"
            value: STRING
          properties {
            key: "version"
            value: INT
          base_type: DATASET
parameters {
  parameters {
    key: "input_base"
    value {
      field_value {
        string_value: "/home/kbuilder/imported/my_pipeline/data"
  parameters {
    key: "input_config"
    value {
      field_value {
        string_value: "{\n  \"splits\": [\n    {\n      \"name\": \"single_split\",\n      \"pattern\": \"*\"\n    }\n  ]\n}"
  parameters {
    key: "output_config"
    value {
      field_value {
        string_value: "{\n  \"split_config\": {\n    \"splits\": [\n      {\n        \"hash_buckets\": 2,\n        \"name\": \"train\"\n      },\n      {\n        \"hash_buckets\": 1,\n        \"name\": \"eval\"\n      }\n    ]\n  }\n}"
  parameters {
    key: "output_data_format"
    value {
      field_value {
        int_value: 6
  parameters {
    key: "output_file_format"
    value {
      field_value {
        int_value: 5
downstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2022-02-03T11:10:48.556314')
INFO:absl:Generating examples.
INFO:absl:Processing input csv data /home/kbuilder/imported/my_pipeline/data/* to TFExample.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
E0203 11:10:49.163841363    5734 fork_posix.cc:70]           Fork support is only compatible with the epoll1 and poll polling strategies
WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be.
INFO:absl:Examples generated.
INFO:absl:Value type <class 'NoneType'> of key version in exec_properties is not supported, going to drop it
INFO:absl:Value type <class 'list'> of key _beam_pipeline_args in exec_properties is not supported, going to drop it
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 13 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/13"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:48.556314:CsvExampleGen:examples:0"
custom_properties {
  key: "span"
  value {
    int_value: 0
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
properties {
  key: "version"
  value: INT
base_type: DATASET
)]}) for execution 13
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component CsvExampleGen is finished.
INFO:absl:Component StatisticsGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.statistics_gen.component.StatisticsGen"
    base_type: PROCESS
  id: "StatisticsGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:10:48.556314"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.StatisticsGen"
inputs {
  inputs {
    key: "examples"
    value {
      channels {
        producer_node_query {
          id: "CsvExampleGen"
        context_queries {
          type {
            name: "pipeline"
          name {
            field_value {
              string_value: "my_pipeline"
        context_queries {
          type {
            name: "pipeline_run"
          name {
            field_value {
              string_value: "2022-02-03T11:10:48.556314"
        context_queries {
          type {
            name: "node"
          name {
            field_value {
              string_value: "my_pipeline.CsvExampleGen"
        artifact_query {
          type {
            name: "Examples"
            base_type: DATASET
        output_key: "examples"
      min_count: 1
outputs {
  outputs {
    key: "statistics"
    value {
      artifact_spec {
        type {
          name: "ExampleStatistics"
          properties {
            key: "span"
            value: INT
          properties {
            key: "split_names"
            value: STRING
          base_type: STATISTICS
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
upstream_nodes: "CsvExampleGen"
downstream_nodes: "SchemaGen"
execution_options {
  caching_options {

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 14
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=14, input_dict={'examples': [Artifact(artifact: id: 13
type_id: 15
uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/13"
properties {
  key: "split_names"
  value {
    string_value: "[\"train\", \"eval\"]"
custom_properties {
  key: "file_format"
  value {
    string_value: "tfrecords_gzip"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:48.556314:CsvExampleGen:examples:0"
custom_properties {
  key: "payload_format"
  value {
    string_value: "FORMAT_TF_EXAMPLE"
custom_properties {
  key: "span"
  value {
    int_value: 0
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
state: LIVE
create_time_since_epoch: 1643886649739
last_update_time_since_epoch: 1643886649739
, artifact_type: id: 15
name: "Examples"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
properties {
  key: "version"
  value: INT
base_type: DATASET
)]}, output_dict=defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/14"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:48.556314:StatisticsGen:statistics:0"
, artifact_type: name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
base_type: STATISTICS
)]}), exec_properties={'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/14/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/stateful_working_dir/2022-02-03T11:10:48.556314', tmp_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/14/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.statistics_gen.component.StatisticsGen"
    base_type: PROCESS
  id: "StatisticsGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:10:48.556314"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.StatisticsGen"
inputs {
  inputs {
    key: "examples"
    value {
      channels {
        producer_node_query {
          id: "CsvExampleGen"
        context_queries {
          type {
            name: "pipeline"
          name {
            field_value {
              string_value: "my_pipeline"
        context_queries {
          type {
            name: "pipeline_run"
          name {
            field_value {
              string_value: "2022-02-03T11:10:48.556314"
        context_queries {
          type {
            name: "node"
          name {
            field_value {
              string_value: "my_pipeline.CsvExampleGen"
        artifact_query {
          type {
            name: "Examples"
            base_type: DATASET
        output_key: "examples"
      min_count: 1
outputs {
  outputs {
    key: "statistics"
    value {
      artifact_spec {
        type {
          name: "ExampleStatistics"
          properties {
            key: "span"
            value: INT
          properties {
            key: "split_names"
            value: STRING
          base_type: STATISTICS
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
upstream_nodes: "CsvExampleGen"
downstream_nodes: "SchemaGen"
execution_options {
  caching_options {
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2022-02-03T11:10:48.556314')
INFO:absl:Generating statistics for split train.
INFO:absl:Statistics for split train written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/14/Split-train.
INFO:absl:Generating statistics for split eval.
INFO:absl:Statistics for split eval written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/14/Split-eval.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 14 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/14"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:48.556314:StatisticsGen:statistics:0"
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
, artifact_type: name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
base_type: STATISTICS
)]}) for execution 14
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component StatisticsGen is finished.
INFO:absl:Component SchemaGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.schema_gen.component.SchemaGen"
    base_type: PROCESS
  id: "SchemaGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:10:48.556314"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.SchemaGen"
inputs {
  inputs {
    key: "statistics"
    value {
      channels {
        producer_node_query {
          id: "StatisticsGen"
        context_queries {
          type {
            name: "pipeline"
          name {
            field_value {
              string_value: "my_pipeline"
        context_queries {
          type {
            name: "pipeline_run"
          name {
            field_value {
              string_value: "2022-02-03T11:10:48.556314"
        context_queries {
          type {
            name: "node"
          name {
            field_value {
              string_value: "my_pipeline.StatisticsGen"
        artifact_query {
          type {
            name: "ExampleStatistics"
            base_type: STATISTICS
        output_key: "statistics"
      min_count: 1
outputs {
  outputs {
    key: "schema"
    value {
      artifact_spec {
        type {
          name: "Schema"
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
  parameters {
    key: "infer_feature_shape"
    value {
      field_value {
        int_value: 1
upstream_nodes: "StatisticsGen"
execution_options {
  caching_options {

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 15
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=15, input_dict={'statistics': [Artifact(artifact: id: 14
type_id: 17
uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/14"
properties {
  key: "split_names"
  value {
    string_value: "[\"train\", \"eval\"]"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:48.556314:StatisticsGen:statistics:0"
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
state: LIVE
create_time_since_epoch: 1643886653128
last_update_time_since_epoch: 1643886653128
, artifact_type: id: 17
name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
properties {
  key: "split_names"
  value: STRING
base_type: STATISTICS
)]}, output_dict=defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/15"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:48.556314:SchemaGen:schema:0"
, artifact_type: name: "Schema"
)]}), exec_properties={'infer_feature_shape': 1, 'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/15/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/stateful_working_dir/2022-02-03T11:10:48.556314', tmp_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/15/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.schema_gen.component.SchemaGen"
    base_type: PROCESS
  id: "SchemaGen"
contexts {
  contexts {
    type {
      name: "pipeline"
    name {
      field_value {
        string_value: "my_pipeline"
  contexts {
    type {
      name: "pipeline_run"
    name {
      field_value {
        string_value: "2022-02-03T11:10:48.556314"
  contexts {
    type {
      name: "node"
    name {
      field_value {
        string_value: "my_pipeline.SchemaGen"
inputs {
  inputs {
    key: "statistics"
    value {
      channels {
        producer_node_query {
          id: "StatisticsGen"
        context_queries {
          type {
            name: "pipeline"
          name {
            field_value {
              string_value: "my_pipeline"
        context_queries {
          type {
            name: "pipeline_run"
          name {
            field_value {
              string_value: "2022-02-03T11:10:48.556314"
        context_queries {
          type {
            name: "node"
          name {
            field_value {
              string_value: "my_pipeline.StatisticsGen"
        artifact_query {
          type {
            name: "ExampleStatistics"
            base_type: STATISTICS
        output_key: "statistics"
      min_count: 1
outputs {
  outputs {
    key: "schema"
    value {
      artifact_spec {
        type {
          name: "Schema"
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
  parameters {
    key: "infer_feature_shape"
    value {
      field_value {
        int_value: 1
upstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2022-02-03T11:10:48.556314')
INFO:absl:Processing schema from statistics for split train.
INFO:absl:Processing schema from statistics for split eval.
INFO:absl:Schema written to ./tfx_pipeline_output/my_pipeline/SchemaGen/schema/15/schema.pbtxt.
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 15 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/15"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:48.556314:SchemaGen:schema:0"
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
, artifact_type: name: "Schema"
)]}) for execution 15
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component SchemaGen is finished.

Powinieneś być w stanie znaleźć swój nowy model pod adresem SERVING_MODEL_DIR .

Krok 6. (Opcjonalnie) Wdróż swój potok w Kubeflow Pipelines w GCP.

Jak wspomniano wcześniej, local_runner.py jest dobry do debugowania lub celów programistycznych, ale nie jest najlepszym rozwiązaniem dla obciążeń produkcyjnych. W tym kroku wdrożymy potok w Kubeflow Pipelines w Google Cloud.


Potrzebujemy pakietu kfp python i programu skaffold , aby wdrożyć potok w klastrze Kubeflow Pipelines.

pip install --upgrade -q kfp

# Download skaffold and set it executable.
curl -Lo skaffold https://storage.googleapis.com/skaffold/releases/latest/skaffold-linux-amd64 && chmod +x skaffold

Musisz przenieść plik binarny skaffold do miejsca, w którym twoja powłoka może go znaleźć. Lub możesz określić ścieżkę do skaffold podczas uruchamiania pliku binarnego tfx z --skaffold-cmd .

# Move skaffold binary into your path
mv skaffold /home/jupyter/.local/bin/
mv: cannot move 'skaffold' to '/home/jupyter/.local/bin/': No such file or directory

Do uruchomienia potoku potrzebny jest również klaster Kubeflow Pipelines. Wykonaj kroki 1 i 2 w samouczku TFX on Cloud AI Platform Pipelines .

Gdy klaster będzie gotowy, otwórz pulpit nawigacyjny potoku, klikając opcję Otwórz pulpit nawigacyjny potoków na stronie Pipelines w konsoli Google Cloud . Adres URL tej strony to ENDPOINT , aby zażądać uruchomienia potoku. Wartość punktu końcowego to wszystko w adresie URL po https://, aż do googleusercontent.com włącznie. Umieść swój punkt końcowy w następującym bloku kodu.

ENDPOINT='' # Enter your ENDPOINT here.

Aby uruchomić nasz kod w klastrze Kubeflow Pipelines, musimy spakować nasz kod do obrazu kontenera. Obraz zostanie zbudowany automatycznie podczas wdrażania naszego potoku, a wystarczy ustawić nazwę i rejestr kontenerów dla obrazu. W naszym przykładzie użyjemy rejestru kontenerów Google i nazwiemy go tfx-pipeline .

# Read GCP project id from env.
shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null

# Docker image name for the pipeline image.
CUSTOM_TFX_IMAGE='gcr.io/' + GOOGLE_CLOUD_PROJECT + '/tfx-pipeline'

Ustaw lokalizację danych.

Twoje dane powinny być dostępne z klastra Kubeflow Pipelines. Jeśli używałeś danych w swoim środowisku lokalnym, może być konieczne przesłanie ich do pamięci zdalnej, takiej jak Google Cloud Storage. Na przykład możemy przesłać dane pingwinów do domyślnego zasobnika, który jest tworzony automatycznie po wdrożeniu klastra Kubeflow Pipelines, jak poniżej.

gsutil cp data/data.csv gs://{GOOGLE_CLOUD_PROJECT}-kubeflowpipelines-default/tfx-template/data/penguin/
Copying file://data/data.csv [Content-Type=text/csv]...
NotFoundException: 404 The destination bucket gs://tf-benchmark-dashboard-kubeflowpipelines-default does not exist or the write to the destination must be restarted

Zaktualizuj lokalizację danych przechowywaną w DATA_PATH w kubeflow_runner.py .

Jeśli używasz BigQueryExampleGen, nie musisz przesyłać pliku danych, ale upewnij się, że kubeflow_runner.py używa tego samego query i argumentu beam_pipeline_args dla funkcji pipeline.create_pipeline() .

Wdróż potok.

Jeśli wszystko jest gotowe, możesz utworzyć potok za pomocą polecenia tfx pipeline create .

!tfx pipeline create  \
--engine=kubeflow \
--pipeline-path=kubeflow_runner.py \
--endpoint={ENDPOINT} \
[Error] --build-target-image flag was DELETED. You should specify the build target image at the `KubeflowDagRunnerConfig` class instead, and use --build-image flag without argument to build a container image when creating or updating a pipeline.

Teraz rozpocznij wykonanie z nowo utworzonym potoku za pomocą polecenia tfx run create .

tfx run create --engine=kubeflow --pipeline-name={PIPELINE_NAME} --endpoint={ENDPOINT}
Creating a run for pipeline: my_pipeline
Failed to load kube config.
Traceback (most recent call last):
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connection.py", line 175, in _new_conn
    (self._dns_host, self.port), self.timeout, **extra_kw
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/util/connection.py", line 95, in create_connection
    raise err
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/util/connection.py", line 85, in create_connection
ConnectionRefusedError: [Errno 111] Connection refused

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connectionpool.py", line 710, in urlopen
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connectionpool.py", line 398, in _make_request
    conn.request(method, url, **httplib_request_kw)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connection.py", line 239, in request
    super(HTTPConnection, self).request(method, url, body=body, headers=headers)
  File "/usr/lib/python3.7/http/client.py", line 1256, in request
    self._send_request(method, url, body, headers, encode_chunked)
  File "/usr/lib/python3.7/http/client.py", line 1302, in _send_request
    self.endheaders(body, encode_chunked=encode_chunked)
  File "/usr/lib/python3.7/http/client.py", line 1251, in endheaders
    self._send_output(message_body, encode_chunked=encode_chunked)
  File "/usr/lib/python3.7/http/client.py", line 1030, in _send_output
  File "/usr/lib/python3.7/http/client.py", line 970, in send
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connection.py", line 205, in connect
    conn = self._new_conn()
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connection.py", line 187, in _new_conn
    self, "Failed to establish a new connection: %s" % e
urllib3.exceptions.NewConnectionError: <urllib3.connection.HTTPConnection object at 0x7ff729e34190>: Failed to establish a new connection: [Errno 111] Connection refused

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/tmpfs/src/tf_docs_env/bin/tfx", line 8, in <module>
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 829, in __call__
    return self.main(*args, **kwargs)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 782, in main
    rv = self.invoke(ctx)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 1066, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/decorators.py", line 73, in new_func
    return ctx.invoke(f, obj, *args, **kwargs)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tfx/tools/cli/commands/run.py", line 94, in create_run
    handler = handler_factory.create_handler(ctx.flags_dict)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tfx/tools/cli/handler/handler_factory.py", line 93, in create_handler
    return kubeflow_handler.KubeflowHandler(flags_dict)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tfx/tools/cli/handler/kubeflow_handler.py", line 62, in __init__
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp/_client.py", line 197, in __init__
    if not self._context_setting['namespace'] and self.get_kfp_healthz(
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp/_client.py", line 411, in get_kfp_healthz
    response = self._healthz_api.get_healthz()
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp_server_api/api/healthz_service_api.py", line 63, in get_healthz
    return self.get_healthz_with_http_info(**kwargs)  # noqa: E501
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp_server_api/api/healthz_service_api.py", line 148, in get_healthz_with_http_info
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp_server_api/api_client.py", line 369, in call_api
    _preload_content, _request_timeout, _host)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp_server_api/api_client.py", line 185, in __call_api
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp_server_api/api_client.py", line 393, in request
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp_server_api/rest.py", line 234, in GET
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp_server_api/rest.py", line 212, in request
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/request.py", line 75, in request
    method, url, fields=fields, headers=headers, **urlopen_kw
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/request.py", line 96, in request_encode_url
    return self.urlopen(method, url, **extra_kw)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/poolmanager.py", line 375, in urlopen
    response = conn.urlopen(method, u.request_uri, **kw)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connectionpool.py", line 826, in urlopen
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connectionpool.py", line 826, in urlopen
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connectionpool.py", line 826, in urlopen
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connectionpool.py", line 786, in urlopen
    method, url, error=e, _pool=self, _stacktrace=sys.exc_info()[2]
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/util/retry.py", line 592, in increment
    raise MaxRetryError(_pool, url, error or ResponseError(cause))
urllib3.exceptions.MaxRetryError: HTTPConnectionPool(host='localhost', port=80): Max retries exceeded with url: /apis/v1beta1/healthz (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7ff729e34190>: Failed to establish a new connection: [Errno 111] Connection refused'))

Możesz też uruchomić potok na pulpicie nawigacyjnym Kubeflow Pipelines. Nowe uruchomienie zostanie wyświetlone w obszarze Experiments na pulpicie nawigacyjnym Kubeflow Pipelines. Kliknięcie w eksperyment pozwoli Ci monitorować postęp i wizualizować artefakty utworzone podczas wykonywania.

Jeśli interesuje Cię uruchomienie potoku w Kubeflow Pipelines, więcej instrukcji znajdziesz w samouczku TFX on Cloud AI Platform Pipelines .


Aby wyczyścić wszystkie zasoby Google Cloud użyte w tym kroku, możesz usunąć projekt Google Cloud użyty w samouczku.

Alternatywnie możesz wyczyścić poszczególne zasoby, odwiedzając każdą konsolę: