Crea una pipeline TFX per i tuoi dati con il modello Penguin


introduzione

Questo documento fornirà le istruzioni per creare una pipeline TensorFlow Extended (TFX) per il tuo set di dati utilizzando il modello pinguino fornito con il pacchetto TFX Python. Inizialmente, la pipeline creata utilizzerà il set di dati Palmer Penguins , ma trasformeremo la pipeline per il tuo set di dati.

Prerequisiti

  • Linux/MacOS
  • Python 3.6-3.8
  • Quaderno di Giove

Passaggio 1. Copia il modello predefinito nella directory del progetto.

In questo passaggio, creeremo una directory di progetto della pipeline funzionante e file copiando i file dal modello di pinguino in TFX. Puoi pensare a questo come a un'impalcatura per il tuo progetto di pipeline TFX.

Aggiorna Pip

Se stiamo eseguendo in Colab, dovremmo assicurarci di avere l'ultima versione di Pip. I sistemi locali possono ovviamente essere aggiornati separatamente.

import sys
if 'google.colab' in sys.modules:
  !pip install --upgrade pip

Installa il pacchetto richiesto

Innanzitutto, installa TFX e TensorFlow Model Analysis (TFMA).

pip install -U tfx tensorflow-model-analysis

Controlliamo le versioni di TFX.

import tensorflow as tf
import tensorflow_model_analysis as tfma
import tfx

print('TF version: {}'.format(tf.__version__))
print('TFMA version: {}'.format(tfma.__version__))
print('TFX version: {}'.format(tfx.__version__))
TF version: 2.7.1
TFMA version: 0.37.0
TFX version: 1.6.0

Siamo pronti per creare una pipeline.

Imposta PROJECT_DIR sulla destinazione appropriata per il tuo ambiente. Il valore predefinito è ~/imported/${PIPELINE_NAME} , appropriato per l'ambiente Google Cloud AI Platform Notebook .

Puoi assegnare alla pipeline un nome diverso modificando PIPELINE_NAME di seguito. Questo diventerà anche il nome della directory del progetto in cui verranno inseriti i tuoi file.

PIPELINE_NAME="my_pipeline"
import os
# Set this project directory to your new tfx pipeline project.
PROJECT_DIR=os.path.join(os.path.expanduser("~"), "imported", PIPELINE_NAME)

Copia i file modello.

TFX include il modello penguin con il pacchetto python TFX. Il modello penguin contiene molte istruzioni per portare il tuo set di dati nella pipeline, che è lo scopo di questo tutorial.

Il tfx template copy CLI copia i file di modello predefiniti nella directory del progetto.

# Set `PATH` to include user python binary directory and a directory containing `skaffold`.
PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin

!tfx template copy \
  --pipeline-name={PIPELINE_NAME} \
  --destination-path={PROJECT_DIR} \
  --model=penguin
env: PATH=/tmpfs/src/tf_docs_env/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/snap/bin:/opt/puppetlabs/bin:/opt/android-studio/current/bin:/usr/local/go/bin:/usr/local/go/packages/bin:/opt/kubernetes/client/bin/:/home/kbuilder/.local/bin:/home/jupyter/.local/bin
CLI
Copying penguin pipeline template
kubeflow_runner.py -> /home/kbuilder/imported/my_pipeline/kubeflow_runner.py
configs.py -> /home/kbuilder/imported/my_pipeline/pipeline/configs.py
pipeline.py -> /home/kbuilder/imported/my_pipeline/pipeline/pipeline.py
__init__.py -> /home/kbuilder/imported/my_pipeline/pipeline/__init__.py
model.py -> /home/kbuilder/imported/my_pipeline/models/model.py
features.py -> /home/kbuilder/imported/my_pipeline/models/features.py
features_test.py -> /home/kbuilder/imported/my_pipeline/models/features_test.py
preprocessing_test.py -> /home/kbuilder/imported/my_pipeline/models/preprocessing_test.py
preprocessing.py -> /home/kbuilder/imported/my_pipeline/models/preprocessing.py
model_test.py -> /home/kbuilder/imported/my_pipeline/models/model_test.py
__init__.py -> /home/kbuilder/imported/my_pipeline/models/__init__.py
constants.py -> /home/kbuilder/imported/my_pipeline/models/constants.py
local_runner.py -> /home/kbuilder/imported/my_pipeline/local_runner.py
__init__.py -> /home/kbuilder/imported/my_pipeline/__init__.py

Modificare il contesto della directory di lavoro in questo notebook nella directory del progetto.

%cd {PROJECT_DIR}
/home/kbuilder/imported/my_pipeline

Sfoglia i file di origine copiati

Il modello TFX fornisce file di scaffold di base per creare una pipeline, inclusi codice sorgente Python e dati di esempio. Il modello penguin utilizza lo stesso set di dati e modello ML di Palmer Penguins dell'esempio Penguin .

Ecco una breve introduzione a ciascuno dei file Python.

  • pipeline : questa directory contiene la definizione della pipeline
    • configs.py — definisce le costanti comuni per i corridori della pipeline
    • pipeline.py — definisce i componenti TFX e una pipeline
  • models : questa directory contiene le definizioni del modello ML
    • features.py , features_test.py — definisce le caratteristiche per il modello
    • preprocessing.py , preprocessing_test.py — definisce le routine di preelaborazione per i dati
    • constants.py — definisce le costanti del modello
    • model.py , model_test.py — definisce il modello ML utilizzando framework ML come TensorFlow
  • local_runner.py — definisce un corridore per l'ambiente locale che utilizza il motore di orchestrazione locale
  • kubeflow_runner.py — definisce un runner per il motore di orchestrazione di Kubeflow Pipelines

Per impostazione predefinita, il modello include solo componenti TFX standard. Se hai bisogno di alcune azioni personalizzate, puoi creare componenti personalizzati per la tua pipeline. Si prega di consultare la guida ai componenti personalizzati TFX per i dettagli.

File di unit test.

Potresti notare che ci sono alcuni file con _test.py nel loro nome. Si tratta di unit test della pipeline e si consiglia di aggiungere altri unit test man mano che si implementano le proprie pipeline. È possibile eseguire unit test fornendo il nome del modulo dei file di test con il flag -m . Di solito puoi ottenere un nome di modulo eliminando l'estensione .py e sostituendo / con . . Per esempio:

import sys
!{sys.executable} -m models.features_test
Running tests under Python 3.7.5: /tmpfs/src/tf_docs_env/bin/python
[ RUN      ] FeaturesTest.testLabelKey
INFO:tensorflow:time(__main__.FeaturesTest.testLabelKey): 0.0s
I0203 11:08:46.306882 140258321348416 test_util.py:2309] time(__main__.FeaturesTest.testLabelKey): 0.0s
[       OK ] FeaturesTest.testLabelKey
[ RUN      ] FeaturesTest.test_session
[  SKIPPED ] FeaturesTest.test_session
----------------------------------------------------------------------
Ran 2 tests in 0.001s

OK (skipped=1)

Crea una pipeline TFX nell'ambiente locale.

TFX supporta diversi motori di orchestrazione per eseguire pipeline. Useremo il motore di orchestrazione locale. Il motore di orchestrazione locale viene eseguito senza ulteriori dipendenze ed è adatto per lo sviluppo e il debug poiché viene eseguito in ambiente locale anziché dipendere da cluster di elaborazione remoti.

Utilizzeremo local_runner.py per eseguire la pipeline utilizzando l'agente di orchestrazione locale. Devi creare una pipeline prima di eseguirla. È possibile creare una pipeline con il comando pipeline create .

tfx pipeline create --engine=local --pipeline_path=local_runner.py
CLI
Creating pipeline
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
Pipeline "my_pipeline" created successfully.

Il comando pipeline create registra la pipeline definita in local_runner.py senza eseguirla effettivamente.

Eseguirai la pipeline creata con il comando run create nei passaggi seguenti.

Passaggio 2. Importa i TUOI dati nella pipeline.

La pipeline iniziale acquisisce il set di dati pinguino incluso nel modello. Devi inserire i tuoi dati nella pipeline e la maggior parte delle pipeline TFX inizia con il componente ExampleGen.

Scegli un esempioGen

I tuoi dati possono essere archiviati ovunque possa accedere la tua pipeline, su un filesystem locale o distribuito o su un sistema interrogabile. TFX fornisce vari componenti ExampleGen per portare i tuoi dati in una pipeline TFX. È possibile sceglierne uno dai seguenti esempi di generazione di componenti.

Puoi anche creare il tuo ExampleGen, ad esempio, tfx include un ExecampleGen personalizzato che utilizza Presto come origine dati. Consulta la guida per ulteriori informazioni su come utilizzare e sviluppare esecutori personalizzati.

Dopo aver deciso quale ExampleGen utilizzare, dovrai modificare la definizione della pipeline per utilizzare i tuoi dati.

  1. Modifica DATA_PATH in local_runner.py e impostalo sulla posizione dei tuoi file.

    • Se hai file nell'ambiente locale, specifica il percorso. Questa è l'opzione migliore per lo sviluppo o il debug di una pipeline.
    • Se i file sono archiviati in GCS, puoi utilizzare un percorso che inizia con gs://{bucket_name}/... . Assicurati di poter accedere a GCS dal tuo terminale, ad esempio usando gsutil . Se necessario, segui la guida all'autorizzazione in Google Cloud .
    • Se desideri utilizzare un ExampleGen basato su query come BigQueryExampleGen, hai bisogno di un'istruzione Query per selezionare i dati dall'origine dati. Ci sono alcune altre cose che devi impostare per utilizzare Google Cloud BigQuery come origine dati.
    • In pipeline/configs.py :
      • Modifica GOOGLE_CLOUD_PROJECT e GCS_BUCKET_NAME nel tuo progetto GCP e nel nome del bucket. Il bucket dovrebbe esistere prima di eseguire la pipeline.
      • BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS variabile BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS.
      • Decommenta e imposta la variabile BIG_QUERY_QUERY sulla tua istruzione di query .
    • In local_runner.py :
      • Commenta l'argomento data_path e decommenta l'argomento della query invece in pipeline.create_pipeline() .
    • In pipeline/pipeline.py :
      • Commenta l'argomento data_path e decommenta l'argomento della query in create_pipeline() .
      • Usa BigQueryExampleGen invece di CsvExampleGen.
  2. Sostituisci CsvExampleGen esistente nella tua classe ExampleGen in pipeline/pipeline.py . Ogni classe ExampleGen ha una firma diversa. Consulta la guida ai componenti di ExampleGen per maggiori dettagli. Non dimenticare di importare i moduli richiesti con le istruzioni di import in pipeline/pipeline.py .

La pipeline iniziale è composta da quattro componenti, ExampleGen , StatisticsGen , SchemaGen e ExampleValidator . Non è necessario modificare nulla per StatisticsGen , SchemaGen e ExampleValidator . Eseguiamo la pipeline per la prima volta.

# Update and run the pipeline.
!tfx pipeline update --engine=local --pipeline_path=local_runner.py \
 && tfx run create --engine=local --pipeline_name={PIPELINE_NAME}
CLI
Updating pipeline
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
Pipeline "my_pipeline" updated successfully.
CLI
Creating a run for pipeline: my_pipeline
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Using deployment config:
 executor_specs {
  key: "CsvExampleGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor"
      }
    }
  }
}
executor_specs {
  key: "SchemaGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.schema_gen.executor.Executor"
    }
  }
}
executor_specs {
  key: "StatisticsGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.statistics_gen.executor.Executor"
      }
    }
  }
}
custom_driver_specs {
  key: "CsvExampleGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.example_gen.driver.FileBasedDriver"
    }
  }
}
metadata_connection_config {
  database_connection_config {
    sqlite {
      filename_uri: "./tfx_metadata/my_pipeline/metadata.db"
      connection_mode: READWRITE_OPENCREATE
    }
  }
}

INFO:absl:Using connection config:
 sqlite {
  filename_uri: "./tfx_metadata/my_pipeline/metadata.db"
  connection_mode: READWRITE_OPENCREATE
}

INFO:absl:Component CsvExampleGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
  }
  id: "CsvExampleGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2022-02-03T11:09:12.120566"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.CsvExampleGen"
      }
    }
  }
}
outputs {
  outputs {
    key: "examples"
    value {
      artifact_spec {
        type {
          name: "Examples"
          properties {
            key: "span"
            value: INT
          }
          properties {
            key: "split_names"
            value: STRING
          }
          properties {
            key: "version"
            value: INT
          }
          base_type: DATASET
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "input_base"
    value {
      field_value {
        string_value: "/home/kbuilder/imported/my_pipeline/data"
      }
    }
  }
  parameters {
    key: "input_config"
    value {
      field_value {
        string_value: "{\n  \"splits\": [\n    {\n      \"name\": \"single_split\",\n      \"pattern\": \"*\"\n    }\n  ]\n}"
      }
    }
  }
  parameters {
    key: "output_config"
    value {
      field_value {
        string_value: "{\n  \"split_config\": {\n    \"splits\": [\n      {\n        \"hash_buckets\": 2,\n        \"name\": \"train\"\n      },\n      {\n        \"hash_buckets\": 1,\n        \"name\": \"eval\"\n      }\n    ]\n  }\n}"
      }
    }
  }
  parameters {
    key: "output_data_format"
    value {
      field_value {
        int_value: 6
      }
    }
  }
  parameters {
    key: "output_file_format"
    value {
      field_value {
        int_value: 5
      }
    }
  }
}
downstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
  }
}

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:select span and version = (0, None)
INFO:absl:latest span and version = (0, None)
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 1
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=1, input_dict={}, output_dict=defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/1"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522"
  }
}
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:12.120566:CsvExampleGen:examples:0"
  }
}
custom_properties {
  key: "span"
  value {
    int_value: 0
  }
}
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
properties {
  key: "version"
  value: INT
}
base_type: DATASET
)]}), exec_properties={'input_config': '{\n  "splits": [\n    {\n      "name": "single_split",\n      "pattern": "*"\n    }\n  ]\n}', 'output_data_format': 6, 'output_file_format': 5, 'output_config': '{\n  "split_config": {\n    "splits": [\n      {\n        "hash_buckets": 2,\n        "name": "train"\n      },\n      {\n        "hash_buckets": 1,\n        "name": "eval"\n      }\n    ]\n  }\n}', 'input_base': '/home/kbuilder/imported/my_pipeline/data', 'span': 0, 'version': None, 'input_fingerprint': 'split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/1/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/stateful_working_dir/2022-02-03T11:09:12.120566', tmp_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/1/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
  }
  id: "CsvExampleGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2022-02-03T11:09:12.120566"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.CsvExampleGen"
      }
    }
  }
}
outputs {
  outputs {
    key: "examples"
    value {
      artifact_spec {
        type {
          name: "Examples"
          properties {
            key: "span"
            value: INT
          }
          properties {
            key: "split_names"
            value: STRING
          }
          properties {
            key: "version"
            value: INT
          }
          base_type: DATASET
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "input_base"
    value {
      field_value {
        string_value: "/home/kbuilder/imported/my_pipeline/data"
      }
    }
  }
  parameters {
    key: "input_config"
    value {
      field_value {
        string_value: "{\n  \"splits\": [\n    {\n      \"name\": \"single_split\",\n      \"pattern\": \"*\"\n    }\n  ]\n}"
      }
    }
  }
  parameters {
    key: "output_config"
    value {
      field_value {
        string_value: "{\n  \"split_config\": {\n    \"splits\": [\n      {\n        \"hash_buckets\": 2,\n        \"name\": \"train\"\n      },\n      {\n        \"hash_buckets\": 1,\n        \"name\": \"eval\"\n      }\n    ]\n  }\n}"
      }
    }
  }
  parameters {
    key: "output_data_format"
    value {
      field_value {
        int_value: 6
      }
    }
  }
  parameters {
    key: "output_file_format"
    value {
      field_value {
        int_value: 5
      }
    }
  }
}
downstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
  }
}
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2022-02-03T11:09:12.120566')
INFO:absl:Generating examples.
INFO:absl:Processing input csv data /home/kbuilder/imported/my_pipeline/data/* to TFExample.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
E0203 11:09:12.848598153    5127 fork_posix.cc:70]           Fork support is only compatible with the epoll1 and poll polling strategies
WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be.
INFO:absl:Examples generated.
INFO:absl:Value type <class 'NoneType'> of key version in exec_properties is not supported, going to drop it
INFO:absl:Value type <class 'list'> of key _beam_pipeline_args in exec_properties is not supported, going to drop it
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 1 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/1"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522"
  }
}
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:12.120566:CsvExampleGen:examples:0"
  }
}
custom_properties {
  key: "span"
  value {
    int_value: 0
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
  }
}
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
properties {
  key: "version"
  value: INT
}
base_type: DATASET
)]}) for execution 1
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component CsvExampleGen is finished.
INFO:absl:Component StatisticsGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.statistics_gen.component.StatisticsGen"
    base_type: PROCESS
  }
  id: "StatisticsGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2022-02-03T11:09:12.120566"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.StatisticsGen"
      }
    }
  }
}
inputs {
  inputs {
    key: "examples"
    value {
      channels {
        producer_node_query {
          id: "CsvExampleGen"
        }
        context_queries {
          type {
            name: "pipeline"
          }
          name {
            field_value {
              string_value: "my_pipeline"
            }
          }
        }
        context_queries {
          type {
            name: "pipeline_run"
          }
          name {
            field_value {
              string_value: "2022-02-03T11:09:12.120566"
            }
          }
        }
        context_queries {
          type {
            name: "node"
          }
          name {
            field_value {
              string_value: "my_pipeline.CsvExampleGen"
            }
          }
        }
        artifact_query {
          type {
            name: "Examples"
            base_type: DATASET
          }
        }
        output_key: "examples"
      }
      min_count: 1
    }
  }
}
outputs {
  outputs {
    key: "statistics"
    value {
      artifact_spec {
        type {
          name: "ExampleStatistics"
          properties {
            key: "span"
            value: INT
          }
          properties {
            key: "split_names"
            value: STRING
          }
          base_type: STATISTICS
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
      }
    }
  }
}
upstream_nodes: "CsvExampleGen"
downstream_nodes: "SchemaGen"
execution_options {
  caching_options {
  }
}

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 2
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=2, input_dict={'examples': [Artifact(artifact: id: 1
type_id: 15
uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/1"
properties {
  key: "split_names"
  value {
    string_value: "[\"train\", \"eval\"]"
  }
}
custom_properties {
  key: "file_format"
  value {
    string_value: "tfrecords_gzip"
  }
}
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522"
  }
}
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:12.120566:CsvExampleGen:examples:0"
  }
}
custom_properties {
  key: "payload_format"
  value {
    string_value: "FORMAT_TF_EXAMPLE"
  }
}
custom_properties {
  key: "span"
  value {
    int_value: 0
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
  }
}
state: LIVE
create_time_since_epoch: 1643886553302
last_update_time_since_epoch: 1643886553302
, artifact_type: id: 15
name: "Examples"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
properties {
  key: "version"
  value: INT
}
base_type: DATASET
)]}, output_dict=defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/2"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:12.120566:StatisticsGen:statistics:0"
  }
}
, artifact_type: name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
base_type: STATISTICS
)]}), exec_properties={'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/2/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/stateful_working_dir/2022-02-03T11:09:12.120566', tmp_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/2/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.statistics_gen.component.StatisticsGen"
    base_type: PROCESS
  }
  id: "StatisticsGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2022-02-03T11:09:12.120566"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.StatisticsGen"
      }
    }
  }
}
inputs {
  inputs {
    key: "examples"
    value {
      channels {
        producer_node_query {
          id: "CsvExampleGen"
        }
        context_queries {
          type {
            name: "pipeline"
          }
          name {
            field_value {
              string_value: "my_pipeline"
            }
          }
        }
        context_queries {
          type {
            name: "pipeline_run"
          }
          name {
            field_value {
              string_value: "2022-02-03T11:09:12.120566"
            }
          }
        }
        context_queries {
          type {
            name: "node"
          }
          name {
            field_value {
              string_value: "my_pipeline.CsvExampleGen"
            }
          }
        }
        artifact_query {
          type {
            name: "Examples"
            base_type: DATASET
          }
        }
        output_key: "examples"
      }
      min_count: 1
    }
  }
}
outputs {
  outputs {
    key: "statistics"
    value {
      artifact_spec {
        type {
          name: "ExampleStatistics"
          properties {
            key: "span"
            value: INT
          }
          properties {
            key: "split_names"
            value: STRING
          }
          base_type: STATISTICS
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
      }
    }
  }
}
upstream_nodes: "CsvExampleGen"
downstream_nodes: "SchemaGen"
execution_options {
  caching_options {
  }
}
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2022-02-03T11:09:12.120566')
INFO:absl:Generating statistics for split train.
INFO:absl:Statistics for split train written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/2/Split-train.
INFO:absl:Generating statistics for split eval.
INFO:absl:Statistics for split eval written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/2/Split-eval.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 2 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/2"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:12.120566:StatisticsGen:statistics:0"
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
  }
}
, artifact_type: name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
base_type: STATISTICS
)]}) for execution 2
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component StatisticsGen is finished.
INFO:absl:Component SchemaGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.schema_gen.component.SchemaGen"
    base_type: PROCESS
  }
  id: "SchemaGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2022-02-03T11:09:12.120566"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.SchemaGen"
      }
    }
  }
}
inputs {
  inputs {
    key: "statistics"
    value {
      channels {
        producer_node_query {
          id: "StatisticsGen"
        }
        context_queries {
          type {
            name: "pipeline"
          }
          name {
            field_value {
              string_value: "my_pipeline"
            }
          }
        }
        context_queries {
          type {
            name: "pipeline_run"
          }
          name {
            field_value {
              string_value: "2022-02-03T11:09:12.120566"
            }
          }
        }
        context_queries {
          type {
            name: "node"
          }
          name {
            field_value {
              string_value: "my_pipeline.StatisticsGen"
            }
          }
        }
        artifact_query {
          type {
            name: "ExampleStatistics"
            base_type: STATISTICS
          }
        }
        output_key: "statistics"
      }
      min_count: 1
    }
  }
}
outputs {
  outputs {
    key: "schema"
    value {
      artifact_spec {
        type {
          name: "Schema"
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
      }
    }
  }
  parameters {
    key: "infer_feature_shape"
    value {
      field_value {
        int_value: 1
      }
    }
  }
}
upstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
  }
}

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 3
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=3, input_dict={'statistics': [Artifact(artifact: id: 2
type_id: 17
uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/2"
properties {
  key: "split_names"
  value {
    string_value: "[\"train\", \"eval\"]"
  }
}
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:12.120566:StatisticsGen:statistics:0"
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
  }
}
state: LIVE
create_time_since_epoch: 1643886556588
last_update_time_since_epoch: 1643886556588
, artifact_type: id: 17
name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
base_type: STATISTICS
)]}, output_dict=defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/3"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:12.120566:SchemaGen:schema:0"
  }
}
, artifact_type: name: "Schema"
)]}), exec_properties={'infer_feature_shape': 1, 'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/3/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/stateful_working_dir/2022-02-03T11:09:12.120566', tmp_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/3/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.schema_gen.component.SchemaGen"
    base_type: PROCESS
  }
  id: "SchemaGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2022-02-03T11:09:12.120566"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.SchemaGen"
      }
    }
  }
}
inputs {
  inputs {
    key: "statistics"
    value {
      channels {
        producer_node_query {
          id: "StatisticsGen"
        }
        context_queries {
          type {
            name: "pipeline"
          }
          name {
            field_value {
              string_value: "my_pipeline"
            }
          }
        }
        context_queries {
          type {
            name: "pipeline_run"
          }
          name {
            field_value {
              string_value: "2022-02-03T11:09:12.120566"
            }
          }
        }
        context_queries {
          type {
            name: "node"
          }
          name {
            field_value {
              string_value: "my_pipeline.StatisticsGen"
            }
          }
        }
        artifact_query {
          type {
            name: "ExampleStatistics"
            base_type: STATISTICS
          }
        }
        output_key: "statistics"
      }
      min_count: 1
    }
  }
}
outputs {
  outputs {
    key: "schema"
    value {
      artifact_spec {
        type {
          name: "Schema"
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
      }
    }
  }
  parameters {
    key: "infer_feature_shape"
    value {
      field_value {
        int_value: 1
      }
    }
  }
}
upstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
  }
}
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2022-02-03T11:09:12.120566')
INFO:absl:Processing schema from statistics for split train.
INFO:absl:Processing schema from statistics for split eval.
INFO:absl:Schema written to ./tfx_pipeline_output/my_pipeline/SchemaGen/schema/3/schema.pbtxt.
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 3 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/3"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:12.120566:SchemaGen:schema:0"
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
  }
}
, artifact_type: name: "Schema"
)]}) for execution 3
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component SchemaGen is finished.

Dovresti vedere "Component ExampleValidator è terminato". se la pipeline è stata eseguita correttamente.

Esaminare l'output della pipeline.

La pipeline TFX produce due tipi di output, artefatti e un DB di metadati (MLMD) che contiene metadati di artefatti ed esecuzioni di pipeline. La posizione dell'output è definita in local_runner.py . Per impostazione predefinita, gli artefatti vengono archiviati nella directory tfx_pipeline_output e i metadati vengono archiviati come database sqlite nella directory tfx_metadata .

È possibile utilizzare le API MLMD per esaminare questi output. In primo luogo, definiremo alcune funzioni di utilità per cercare gli artefatti di output che sono stati appena prodotti.

import tensorflow as tf
import tfx
from ml_metadata import errors
from ml_metadata.proto import metadata_store_pb2
from tfx.types import artifact_utils

# TODO(b/171447278): Move these functions into TFX library.

def get_latest_executions(store, pipeline_name, component_id = None):
  """Fetch all pipeline runs."""
  if component_id is None:  # Find entire pipeline runs.
    run_contexts = [
        c for c in store.get_contexts_by_type('run')
        if c.properties['pipeline_name'].string_value == pipeline_name
    ]
  else:  # Find specific component runs.
    run_contexts = [
        c for c in store.get_contexts_by_type('component_run')
        if c.properties['pipeline_name'].string_value == pipeline_name and
           c.properties['component_id'].string_value == component_id
    ]
  if not run_contexts:
    return []
  # Pick the latest run context.
  latest_context = max(run_contexts,
                       key=lambda c: c.last_update_time_since_epoch)
  return store.get_executions_by_context(latest_context.id)

def get_latest_artifacts(store, pipeline_name, component_id = None):
  """Fetch all artifacts from latest pipeline execution."""
  executions = get_latest_executions(store, pipeline_name, component_id)

  # Fetch all artifacts produced from the given executions.
  execution_ids = [e.id for e in executions]
  events = store.get_events_by_execution_ids(execution_ids)
  artifact_ids = [
      event.artifact_id for event in events
      if event.type == metadata_store_pb2.Event.OUTPUT
  ]
  return store.get_artifacts_by_id(artifact_ids)

def find_latest_artifacts_by_type(store, artifacts, artifact_type):
  """Get the latest artifacts of a specified type."""
  # Get type information from MLMD
  try:
    artifact_type = store.get_artifact_type(artifact_type)
  except errors.NotFoundError:
    return []
  # Filter artifacts with type.
  filtered_artifacts = [aritfact for aritfact in artifacts
                        if aritfact.type_id == artifact_type.id]
  # Convert MLMD artifact data into TFX Artifact instances.
  return [artifact_utils.deserialize_artifact(artifact_type, artifact)
      for artifact in filtered_artifacts]


from tfx.orchestration.experimental.interactive import visualizations

def visualize_artifacts(artifacts):
  """Visualizes artifacts using standard visualization modules."""
  for artifact in artifacts:
    visualization = visualizations.get_registry().get_visualization(
        artifact.type_name)
    if visualization:
      visualization.display(artifact)

from tfx.orchestration.experimental.interactive import standard_visualizations
standard_visualizations.register_standard_visualizations()

import pprint

from tfx.orchestration import metadata
from tfx.types import artifact_utils
from tfx.types import standard_artifacts

def preview_examples(artifacts):
  """Preview a few records from Examples artifacts."""
  pp = pprint.PrettyPrinter()
  for artifact in artifacts:
    print("==== Examples artifact:{}({})".format(artifact.name, artifact.uri))
    for split in artifact_utils.decode_split_names(artifact.split_names):
      print("==== Reading from split:{}".format(split))
      split_uri = artifact_utils.get_split_uri([artifact], split)

      # Get the list of files in this directory (all compressed TFRecord files)
      tfrecord_filenames = [os.path.join(split_uri, name)
                            for name in os.listdir(split_uri)]
      # Create a `TFRecordDataset` to read these files
      dataset = tf.data.TFRecordDataset(tfrecord_filenames,
                                        compression_type="GZIP")
      # Iterate over the first 2 records and decode them.
      for tfrecord in dataset.take(2):
        serialized_example = tfrecord.numpy()
        example = tf.train.Example()
        example.ParseFromString(serialized_example)
        pp.pprint(example)

import local_runner

metadata_connection_config = metadata.sqlite_metadata_connection_config(
              local_runner.METADATA_PATH)

Ora possiamo leggere i metadati degli artefatti di output da MLMD.

with metadata.Metadata(metadata_connection_config) as metadata_handler:
    # Search all aritfacts from the previous pipeline run.
    artifacts = get_latest_artifacts(metadata_handler.store, PIPELINE_NAME)
    # Find artifacts of Examples type.
    examples_artifacts = find_latest_artifacts_by_type(
        metadata_handler.store, artifacts,
        standard_artifacts.Examples.TYPE_NAME)
    # Find artifacts generated from StatisticsGen.
    stats_artifacts = find_latest_artifacts_by_type(
        metadata_handler.store, artifacts,
        standard_artifacts.ExampleStatistics.TYPE_NAME)
    # Find artifacts generated from SchemaGen.
    schema_artifacts = find_latest_artifacts_by_type(
        metadata_handler.store, artifacts,
        standard_artifacts.Schema.TYPE_NAME)
    # Find artifacts generated from ExampleValidator.
    anomalies_artifacts = find_latest_artifacts_by_type(
        metadata_handler.store, artifacts,
        standard_artifacts.ExampleAnomalies.TYPE_NAME)

Ora possiamo esaminare gli output di ciascun componente. Tensorflow Data Validation (TFDV) viene utilizzato in StatisticsGen , SchemaGen ed ExampleValidator e TFDV può essere utilizzato per visualizzare gli output di questi componenti.

In questo tutorial, utilizzeremo i metodi di supporto della visualizzazione in TFX che utilizzano TFDV internamente per mostrare la visualizzazione. Consulta il tutorial sui componenti TFX per saperne di più su ciascun componente.

Esaminare il modulo di output ExampleGen

Esaminiamo l'output di ExampleGen. Dai un'occhiata ai primi due esempi per ogni divisione:

preview_examples(examples_artifacts)

Per impostazione predefinita, TFX ExampleGen divide gli esempi in due suddivisioni, train ed eval , ma puoi modificare la configurazione della suddivisione .

Esaminare l'output di StatisticsGen

visualize_artifacts(stats_artifacts)

Queste statistiche vengono fornite a SchemaGen per costruire automaticamente uno schema di dati.

Esaminare l'output di SchemaGen

visualize_artifacts(schema_artifacts)

Questo schema viene automaticamente dedotto dall'output di StatisticsGen. Useremo questo schema generato in questo tutorial, ma puoi anche modificare e personalizzare lo schema .

Esaminare l'output da ExampleValidator

visualize_artifacts(anomalies_artifacts)

Se sono state rilevate anomalie, puoi rivedere i tuoi dati affinché tutti gli esempi seguano le tue ipotesi. Potrebbero essere utili gli output di altri componenti come StatistcsGen. Le anomalie rilevate non bloccano l'esecuzione della pipeline.

Puoi vedere le funzionalità disponibili dagli output di SchemaGen . Se le tue funzionalità possono essere utilizzate per costruire direttamente il modello ML in Trainer , puoi saltare il passaggio successivo e andare al passaggio 4. Altrimenti puoi eseguire un lavoro di ingegneria delle funzionalità nel passaggio successivo. Il componente Transform è necessario quando sono necessarie operazioni a passaggio completo come il calcolo delle medie, soprattutto quando è necessario ridimensionare.

Passaggio 3. (Facoltativo) Progettazione delle funzionalità con il componente Trasforma.

In questo passaggio, definirai vari lavori di progettazione delle funzionalità che verranno utilizzati dal componente Transform nella pipeline. Per ulteriori informazioni, vedere la guida ai componenti Trasforma .

Ciò è necessario solo se il codice di addestramento richiede funzionalità aggiuntive che non sono disponibili nell'output di ExampleGen. In caso contrario, sentiti libero di avanzare rapidamente al passaggio successivo dell'utilizzo di Trainer.

Definire le caratteristiche del modello

models/features.py contiene costanti per definire le caratteristiche per il modello inclusi i nomi delle funzioni, la dimensione del vocabolario e così via. Per impostazione predefinita, il modello penguin ha due costanti, FEATURE_KEYS e LABEL_KEY , perché il nostro modello penguin risolve un problema di classificazione utilizzando l'apprendimento supervisionato e tutte le funzionalità sono caratteristiche numeriche continue. Vedi le definizioni delle caratteristiche dall'esempio del taxi di Chicago per un altro esempio.

Implementare la preelaborazione per l'addestramento/servizio in preprocessing_fn().

L'effettiva ingegneria delle funzionalità avviene nella funzione preprocessing_fn() in models/preprocessing.py .

In preprocessing_fn puoi definire una serie di funzioni che manipolano il dict di input dei tensori per produrre il dict di tensori di output. Ci sono funzioni di supporto come scale_to_0_1 e compute_and_apply_vocabulary di trasformazione di TensorFlow oppure puoi semplicemente utilizzare le normali funzioni di TensorFlow. Per impostazione predefinita, il modello penguin include esempi di utilizzo della funzione tft.scale_to_z_score per normalizzare i valori delle caratteristiche.

Consulta la guida alla trasformazione di Tensflow per ulteriori informazioni sulla creazione di preprocessing_fn .

Aggiungi il componente Trasforma alla pipeline.

Se il tuo preprocessing_fn è pronto, aggiungi il componente Transform alla pipeline.

  1. Nel file pipeline/pipeline.py , decommenta # components.append(transform) per aggiungere il componente alla pipeline.

È possibile aggiornare la pipeline ed eseguirla di nuovo.

!tfx pipeline update --engine=local --pipeline_path=local_runner.py \
 && tfx run create --engine=local --pipeline_name={PIPELINE_NAME}
CLI
Updating pipeline
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
Pipeline "my_pipeline" updated successfully.
CLI
Creating a run for pipeline: my_pipeline
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Using deployment config:
 executor_specs {
  key: "CsvExampleGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor"
      }
    }
  }
}
executor_specs {
  key: "SchemaGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.schema_gen.executor.Executor"
    }
  }
}
executor_specs {
  key: "StatisticsGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.statistics_gen.executor.Executor"
      }
    }
  }
}
custom_driver_specs {
  key: "CsvExampleGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.example_gen.driver.FileBasedDriver"
    }
  }
}
metadata_connection_config {
  database_connection_config {
    sqlite {
      filename_uri: "./tfx_metadata/my_pipeline/metadata.db"
      connection_mode: READWRITE_OPENCREATE
    }
  }
}

INFO:absl:Using connection config:
 sqlite {
  filename_uri: "./tfx_metadata/my_pipeline/metadata.db"
  connection_mode: READWRITE_OPENCREATE
}

INFO:absl:Component CsvExampleGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
  }
  id: "CsvExampleGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2022-02-03T11:09:37.055994"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.CsvExampleGen"
      }
    }
  }
}
outputs {
  outputs {
    key: "examples"
    value {
      artifact_spec {
        type {
          name: "Examples"
          properties {
            key: "span"
            value: INT
          }
          properties {
            key: "split_names"
            value: STRING
          }
          properties {
            key: "version"
            value: INT
          }
          base_type: DATASET
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "input_base"
    value {
      field_value {
        string_value: "/home/kbuilder/imported/my_pipeline/data"
      }
    }
  }
  parameters {
    key: "input_config"
    value {
      field_value {
        string_value: "{\n  \"splits\": [\n    {\n      \"name\": \"single_split\",\n      \"pattern\": \"*\"\n    }\n  ]\n}"
      }
    }
  }
  parameters {
    key: "output_config"
    value {
      field_value {
        string_value: "{\n  \"split_config\": {\n    \"splits\": [\n      {\n        \"hash_buckets\": 2,\n        \"name\": \"train\"\n      },\n      {\n        \"hash_buckets\": 1,\n        \"name\": \"eval\"\n      }\n    ]\n  }\n}"
      }
    }
  }
  parameters {
    key: "output_data_format"
    value {
      field_value {
        int_value: 6
      }
    }
  }
  parameters {
    key: "output_file_format"
    value {
      field_value {
        int_value: 5
      }
    }
  }
}
downstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
  }
}

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:select span and version = (0, None)
INFO:absl:latest span and version = (0, None)
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 4
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=4, input_dict={}, output_dict=defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/4"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522"
  }
}
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:37.055994:CsvExampleGen:examples:0"
  }
}
custom_properties {
  key: "span"
  value {
    int_value: 0
  }
}
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
properties {
  key: "version"
  value: INT
}
base_type: DATASET
)]}), exec_properties={'output_file_format': 5, 'input_base': '/home/kbuilder/imported/my_pipeline/data', 'output_config': '{\n  "split_config": {\n    "splits": [\n      {\n        "hash_buckets": 2,\n        "name": "train"\n      },\n      {\n        "hash_buckets": 1,\n        "name": "eval"\n      }\n    ]\n  }\n}', 'output_data_format': 6, 'input_config': '{\n  "splits": [\n    {\n      "name": "single_split",\n      "pattern": "*"\n    }\n  ]\n}', 'span': 0, 'version': None, 'input_fingerprint': 'split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/4/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/stateful_working_dir/2022-02-03T11:09:37.055994', tmp_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/4/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
  }
  id: "CsvExampleGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2022-02-03T11:09:37.055994"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.CsvExampleGen"
      }
    }
  }
}
outputs {
  outputs {
    key: "examples"
    value {
      artifact_spec {
        type {
          name: "Examples"
          properties {
            key: "span"
            value: INT
          }
          properties {
            key: "split_names"
            value: STRING
          }
          properties {
            key: "version"
            value: INT
          }
          base_type: DATASET
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "input_base"
    value {
      field_value {
        string_value: "/home/kbuilder/imported/my_pipeline/data"
      }
    }
  }
  parameters {
    key: "input_config"
    value {
      field_value {
        string_value: "{\n  \"splits\": [\n    {\n      \"name\": \"single_split\",\n      \"pattern\": \"*\"\n    }\n  ]\n}"
      }
    }
  }
  parameters {
    key: "output_config"
    value {
      field_value {
        string_value: "{\n  \"split_config\": {\n    \"splits\": [\n      {\n        \"hash_buckets\": 2,\n        \"name\": \"train\"\n      },\n      {\n        \"hash_buckets\": 1,\n        \"name\": \"eval\"\n      }\n    ]\n  }\n}"
      }
    }
  }
  parameters {
    key: "output_data_format"
    value {
      field_value {
        int_value: 6
      }
    }
  }
  parameters {
    key: "output_file_format"
    value {
      field_value {
        int_value: 5
      }
    }
  }
}
downstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
  }
}
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2022-02-03T11:09:37.055994')
INFO:absl:Generating examples.
INFO:absl:Processing input csv data /home/kbuilder/imported/my_pipeline/data/* to TFExample.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
E0203 11:09:37.596944686    5287 fork_posix.cc:70]           Fork support is only compatible with the epoll1 and poll polling strategies
WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be.
INFO:absl:Examples generated.
INFO:absl:Value type <class 'NoneType'> of key version in exec_properties is not supported, going to drop it
INFO:absl:Value type <class 'list'> of key _beam_pipeline_args in exec_properties is not supported, going to drop it
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 4 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/4"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522"
  }
}
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:37.055994:CsvExampleGen:examples:0"
  }
}
custom_properties {
  key: "span"
  value {
    int_value: 0
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
  }
}
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
properties {
  key: "version"
  value: INT
}
base_type: DATASET
)]}) for execution 4
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component CsvExampleGen is finished.
INFO:absl:Component StatisticsGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.statistics_gen.component.StatisticsGen"
    base_type: PROCESS
  }
  id: "StatisticsGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2022-02-03T11:09:37.055994"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.StatisticsGen"
      }
    }
  }
}
inputs {
  inputs {
    key: "examples"
    value {
      channels {
        producer_node_query {
          id: "CsvExampleGen"
        }
        context_queries {
          type {
            name: "pipeline"
          }
          name {
            field_value {
              string_value: "my_pipeline"
            }
          }
        }
        context_queries {
          type {
            name: "pipeline_run"
          }
          name {
            field_value {
              string_value: "2022-02-03T11:09:37.055994"
            }
          }
        }
        context_queries {
          type {
            name: "node"
          }
          name {
            field_value {
              string_value: "my_pipeline.CsvExampleGen"
            }
          }
        }
        artifact_query {
          type {
            name: "Examples"
            base_type: DATASET
          }
        }
        output_key: "examples"
      }
      min_count: 1
    }
  }
}
outputs {
  outputs {
    key: "statistics"
    value {
      artifact_spec {
        type {
          name: "ExampleStatistics"
          properties {
            key: "span"
            value: INT
          }
          properties {
            key: "split_names"
            value: STRING
          }
          base_type: STATISTICS
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
      }
    }
  }
}
upstream_nodes: "CsvExampleGen"
downstream_nodes: "SchemaGen"
execution_options {
  caching_options {
  }
}

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 5
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=5, input_dict={'examples': [Artifact(artifact: id: 4
type_id: 15
uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/4"
properties {
  key: "split_names"
  value {
    string_value: "[\"train\", \"eval\"]"
  }
}
custom_properties {
  key: "file_format"
  value {
    string_value: "tfrecords_gzip"
  }
}
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522"
  }
}
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:37.055994:CsvExampleGen:examples:0"
  }
}
custom_properties {
  key: "payload_format"
  value {
    string_value: "FORMAT_TF_EXAMPLE"
  }
}
custom_properties {
  key: "span"
  value {
    int_value: 0
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
  }
}
state: LIVE
create_time_since_epoch: 1643886578210
last_update_time_since_epoch: 1643886578210
, artifact_type: id: 15
name: "Examples"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
properties {
  key: "version"
  value: INT
}
base_type: DATASET
)]}, output_dict=defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/5"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:37.055994:StatisticsGen:statistics:0"
  }
}
, artifact_type: name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
base_type: STATISTICS
)]}), exec_properties={'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/5/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/stateful_working_dir/2022-02-03T11:09:37.055994', tmp_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/5/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.statistics_gen.component.StatisticsGen"
    base_type: PROCESS
  }
  id: "StatisticsGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2022-02-03T11:09:37.055994"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.StatisticsGen"
      }
    }
  }
}
inputs {
  inputs {
    key: "examples"
    value {
      channels {
        producer_node_query {
          id: "CsvExampleGen"
        }
        context_queries {
          type {
            name: "pipeline"
          }
          name {
            field_value {
              string_value: "my_pipeline"
            }
          }
        }
        context_queries {
          type {
            name: "pipeline_run"
          }
          name {
            field_value {
              string_value: "2022-02-03T11:09:37.055994"
            }
          }
        }
        context_queries {
          type {
            name: "node"
          }
          name {
            field_value {
              string_value: "my_pipeline.CsvExampleGen"
            }
          }
        }
        artifact_query {
          type {
            name: "Examples"
            base_type: DATASET
          }
        }
        output_key: "examples"
      }
      min_count: 1
    }
  }
}
outputs {
  outputs {
    key: "statistics"
    value {
      artifact_spec {
        type {
          name: "ExampleStatistics"
          properties {
            key: "span"
            value: INT
          }
          properties {
            key: "split_names"
            value: STRING
          }
          base_type: STATISTICS
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
      }
    }
  }
}
upstream_nodes: "CsvExampleGen"
downstream_nodes: "SchemaGen"
execution_options {
  caching_options {
  }
}
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2022-02-03T11:09:37.055994')
INFO:absl:Generating statistics for split train.
INFO:absl:Statistics for split train written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/5/Split-train.
INFO:absl:Generating statistics for split eval.
INFO:absl:Statistics for split eval written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/5/Split-eval.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 5 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/5"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:37.055994:StatisticsGen:statistics:0"
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
  }
}
, artifact_type: name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
base_type: STATISTICS
)]}) for execution 5
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component StatisticsGen is finished.
INFO:absl:Component SchemaGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.schema_gen.component.SchemaGen"
    base_type: PROCESS
  }
  id: "SchemaGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2022-02-03T11:09:37.055994"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.SchemaGen"
      }
    }
  }
}
inputs {
  inputs {
    key: "statistics"
    value {
      channels {
        producer_node_query {
          id: "StatisticsGen"
        }
        context_queries {
          type {
            name: "pipeline"
          }
          name {
            field_value {
              string_value: "my_pipeline"
            }
          }
        }
        context_queries {
          type {
            name: "pipeline_run"
          }
          name {
            field_value {
              string_value: "2022-02-03T11:09:37.055994"
            }
          }
        }
        context_queries {
          type {
            name: "node"
          }
          name {
            field_value {
              string_value: "my_pipeline.StatisticsGen"
            }
          }
        }
        artifact_query {
          type {
            name: "ExampleStatistics"
            base_type: STATISTICS
          }
        }
        output_key: "statistics"
      }
      min_count: 1
    }
  }
}
outputs {
  outputs {
    key: "schema"
    value {
      artifact_spec {
        type {
          name: "Schema"
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
      }
    }
  }
  parameters {
    key: "infer_feature_shape"
    value {
      field_value {
        int_value: 1
      }
    }
  }
}
upstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
  }
}

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 6
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=6, input_dict={'statistics': [Artifact(artifact: id: 5
type_id: 17
uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/5"
properties {
  key: "split_names"
  value {
    string_value: "[\"train\", \"eval\"]"
  }
}
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:37.055994:StatisticsGen:statistics:0"
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
  }
}
state: LIVE
create_time_since_epoch: 1643886581527
last_update_time_since_epoch: 1643886581527
, artifact_type: id: 17
name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
base_type: STATISTICS
)]}, output_dict=defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/6"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:37.055994:SchemaGen:schema:0"
  }
}
, artifact_type: name: "Schema"
)]}), exec_properties={'exclude_splits': '[]', 'infer_feature_shape': 1}, execution_output_uri='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/6/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/stateful_working_dir/2022-02-03T11:09:37.055994', tmp_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/6/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.schema_gen.component.SchemaGen"
    base_type: PROCESS
  }
  id: "SchemaGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2022-02-03T11:09:37.055994"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.SchemaGen"
      }
    }
  }
}
inputs {
  inputs {
    key: "statistics"
    value {
      channels {
        producer_node_query {
          id: "StatisticsGen"
        }
        context_queries {
          type {
            name: "pipeline"
          }
          name {
            field_value {
              string_value: "my_pipeline"
            }
          }
        }
        context_queries {
          type {
            name: "pipeline_run"
          }
          name {
            field_value {
              string_value: "2022-02-03T11:09:37.055994"
            }
          }
        }
        context_queries {
          type {
            name: "node"
          }
          name {
            field_value {
              string_value: "my_pipeline.StatisticsGen"
            }
          }
        }
        artifact_query {
          type {
            name: "ExampleStatistics"
            base_type: STATISTICS
          }
        }
        output_key: "statistics"
      }
      min_count: 1
    }
  }
}
outputs {
  outputs {
    key: "schema"
    value {
      artifact_spec {
        type {
          name: "Schema"
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
      }
    }
  }
  parameters {
    key: "infer_feature_shape"
    value {
      field_value {
        int_value: 1
      }
    }
  }
}
upstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
  }
}
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2022-02-03T11:09:37.055994')
INFO:absl:Processing schema from statistics for split train.
INFO:absl:Processing schema from statistics for split eval.
INFO:absl:Schema written to ./tfx_pipeline_output/my_pipeline/SchemaGen/schema/6/schema.pbtxt.
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 6 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/6"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:09:37.055994:SchemaGen:schema:0"
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
  }
}
, artifact_type: name: "Schema"
)]}) for execution 6
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component SchemaGen is finished.

Se la pipeline è stata eseguita correttamente, dovresti vedere "La trasformazione del componente è terminata". da qualche parte nel registro. Poiché il componente Transform e il componente ExampleValidator non dipendono l'uno dall'altro, l'ordine delle esecuzioni non è fisso. Detto questo, sia Transform che ExampleValidator possono essere l'ultimo componente nell'esecuzione della pipeline.

Esaminare l'output di Trasforma

Il componente Trasforma crea due tipi di output, un grafico Tensorflow ed esempi trasformati. Gli esempi trasformati sono un tipo di artefatto Esempi prodotto anche da ExampleGen, ma questo contiene invece valori di funzionalità trasformati.

Puoi esaminarli come abbiamo fatto nel passaggio precedente.

with metadata.Metadata(metadata_connection_config) as metadata_handler:
    # Search all aritfacts from the previous run of Transform component.
    artifacts = get_latest_artifacts(metadata_handler.store,
                                     PIPELINE_NAME, "Transform")
    # Find artifacts of Examples type.
    transformed_examples_artifacts = find_latest_artifacts_by_type(
        metadata_handler.store, artifacts,
        standard_artifacts.Examples.TYPE_NAME)
preview_examples(transformed_examples_artifacts)

Passaggio 4. Allena il tuo modello con il componente Trainer.

Costruiremo un modello ML utilizzando il componente Trainer . Per ulteriori informazioni, vedere la guida ai componenti Trainer . È necessario fornire il codice del modello al componente Trainer.

Definisci il tuo modello.

Nel modello pinguino, models.model.run_fn viene utilizzato come argomento run_fn per il componente Trainer . Significa che la funzione run_fn() in models/model.py verrà chiamata quando il componente Trainer viene eseguito. Puoi vedere il codice per costruire un semplice modello DNN usando l'API keras nel codice specificato. Consulta TensorFlow 2.x nella guida TFX per ulteriori informazioni sull'utilizzo dell'API keras in TFX.

In questo run_fn , dovresti creare un modello e salvarlo in una directory indicata da fn_args.serving_model_dir specificata dal componente. Puoi usare altri argomenti in fn_args che viene passato a run_fn . Vedere i codici correlati per l'elenco completo degli argomenti in fn_args .

Definisci le tue funzionalità in models/features.py e usale secondo necessità. Se hai trasformato le tue feature nel passaggio 3, dovresti usare le feature trasformate come input per il tuo modello.

Aggiungi il componente Trainer alla pipeline.

Se il tuo run_fn è pronto, aggiungi il componente Trainer alla pipeline.

  1. Nel file pipeline/pipeline.py , decommentare # components.append(trainer) per aggiungere il componente alla pipeline.

Gli argomenti per il componente trainer potrebbero dipendere dall'utilizzo o meno del componente Transform.

  • Se NON si utilizza il componente Transform , non è necessario modificare gli argomenti.
  • Se utilizzi il componente Transform , devi modificare gli argomenti durante la creazione di un'istanza del componente Trainer .

    • Modifica l'argomento degli examples in examples=transform.outputs['transformed_examples'], . Abbiamo bisogno di usare esempi trasformati per la formazione.
    • Aggiungi un argomento transform_graph come transform_graph=transform.outputs['transform_graph'], . Questo grafico contiene il grafico TensorFlow per le operazioni di trasformazione.
    • Dopo le modifiche precedenti, il codice per la creazione del componente Trainer apparirà come segue.
    # If you use a Transform component.
    trainer = Trainer(
        run_fn=run_fn,
        examples=transform.outputs['transformed_examples'],
        transform_graph=transform.outputs['transform_graph'],
        schema=schema_gen.outputs['schema'],
        ...
    

È possibile aggiornare la pipeline ed eseguirla di nuovo.

!tfx pipeline update --engine=local --pipeline_path=local_runner.py \
 && tfx run create --engine=local --pipeline_name={PIPELINE_NAME}
CLI
Updating pipeline
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
Pipeline "my_pipeline" updated successfully.
CLI
Creating a run for pipeline: my_pipeline
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Using deployment config:
 executor_specs {
  key: "CsvExampleGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor"
      }
    }
  }
}
executor_specs {
  key: "SchemaGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.schema_gen.executor.Executor"
    }
  }
}
executor_specs {
  key: "StatisticsGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.statistics_gen.executor.Executor"
      }
    }
  }
}
custom_driver_specs {
  key: "CsvExampleGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.example_gen.driver.FileBasedDriver"
    }
  }
}
metadata_connection_config {
  database_connection_config {
    sqlite {
      filename_uri: "./tfx_metadata/my_pipeline/metadata.db"
      connection_mode: READWRITE_OPENCREATE
    }
  }
}

INFO:absl:Using connection config:
 sqlite {
  filename_uri: "./tfx_metadata/my_pipeline/metadata.db"
  connection_mode: READWRITE_OPENCREATE
}

INFO:absl:Component CsvExampleGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
  }
  id: "CsvExampleGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2022-02-03T11:10:00.469382"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.CsvExampleGen"
      }
    }
  }
}
outputs {
  outputs {
    key: "examples"
    value {
      artifact_spec {
        type {
          name: "Examples"
          properties {
            key: "span"
            value: INT
          }
          properties {
            key: "split_names"
            value: STRING
          }
          properties {
            key: "version"
            value: INT
          }
          base_type: DATASET
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "input_base"
    value {
      field_value {
        string_value: "/home/kbuilder/imported/my_pipeline/data"
      }
    }
  }
  parameters {
    key: "input_config"
    value {
      field_value {
        string_value: "{\n  \"splits\": [\n    {\n      \"name\": \"single_split\",\n      \"pattern\": \"*\"\n    }\n  ]\n}"
      }
    }
  }
  parameters {
    key: "output_config"
    value {
      field_value {
        string_value: "{\n  \"split_config\": {\n    \"splits\": [\n      {\n        \"hash_buckets\": 2,\n        \"name\": \"train\"\n      },\n      {\n        \"hash_buckets\": 1,\n        \"name\": \"eval\"\n      }\n    ]\n  }\n}"
      }
    }
  }
  parameters {
    key: "output_data_format"
    value {
      field_value {
        int_value: 6
      }
    }
  }
  parameters {
    key: "output_file_format"
    value {
      field_value {
        int_value: 5
      }
    }
  }
}
downstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
  }
}

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:select span and version = (0, None)
INFO:absl:latest span and version = (0, None)
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 7
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=7, input_dict={}, output_dict=defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/7"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522"
  }
}
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:00.469382:CsvExampleGen:examples:0"
  }
}
custom_properties {
  key: "span"
  value {
    int_value: 0
  }
}
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
properties {
  key: "version"
  value: INT
}
base_type: DATASET
)]}), exec_properties={'input_base': '/home/kbuilder/imported/my_pipeline/data', 'output_config': '{\n  "split_config": {\n    "splits": [\n      {\n        "hash_buckets": 2,\n        "name": "train"\n      },\n      {\n        "hash_buckets": 1,\n        "name": "eval"\n      }\n    ]\n  }\n}', 'output_data_format': 6, 'output_file_format': 5, 'input_config': '{\n  "splits": [\n    {\n      "name": "single_split",\n      "pattern": "*"\n    }\n  ]\n}', 'span': 0, 'version': None, 'input_fingerprint': 'split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/7/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/stateful_working_dir/2022-02-03T11:10:00.469382', tmp_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/7/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
  }
  id: "CsvExampleGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2022-02-03T11:10:00.469382"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.CsvExampleGen"
      }
    }
  }
}
outputs {
  outputs {
    key: "examples"
    value {
      artifact_spec {
        type {
          name: "Examples"
          properties {
            key: "span"
            value: INT
          }
          properties {
            key: "split_names"
            value: STRING
          }
          properties {
            key: "version"
            value: INT
          }
          base_type: DATASET
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "input_base"
    value {
      field_value {
        string_value: "/home/kbuilder/imported/my_pipeline/data"
      }
    }
  }
  parameters {
    key: "input_config"
    value {
      field_value {
        string_value: "{\n  \"splits\": [\n    {\n      \"name\": \"single_split\",\n      \"pattern\": \"*\"\n    }\n  ]\n}"
      }
    }
  }
  parameters {
    key: "output_config"
    value {
      field_value {
        string_value: "{\n  \"split_config\": {\n    \"splits\": [\n      {\n        \"hash_buckets\": 2,\n        \"name\": \"train\"\n      },\n      {\n        \"hash_buckets\": 1,\n        \"name\": \"eval\"\n      }\n    ]\n  }\n}"
      }
    }
  }
  parameters {
    key: "output_data_format"
    value {
      field_value {
        int_value: 6
      }
    }
  }
  parameters {
    key: "output_file_format"
    value {
      field_value {
        int_value: 5
      }
    }
  }
}
downstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
  }
}
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2022-02-03T11:10:00.469382')
INFO:absl:Generating examples.
INFO:absl:Processing input csv data /home/kbuilder/imported/my_pipeline/data/* to TFExample.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
E0203 11:10:01.173700221    5436 fork_posix.cc:70]           Fork support is only compatible with the epoll1 and poll polling strategies
WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be.
INFO:absl:Examples generated.
INFO:absl:Value type <class 'NoneType'> of key version in exec_properties is not supported, going to drop it
INFO:absl:Value type <class 'list'> of key _beam_pipeline_args in exec_properties is not supported, going to drop it
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 7 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/7"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522"
  }
}
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:00.469382:CsvExampleGen:examples:0"
  }
}
custom_properties {
  key: "span"
  value {
    int_value: 0
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
  }
}
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
properties {
  key: "version"
  value: INT
}
base_type: DATASET
)]}) for execution 7
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component CsvExampleGen is finished.
INFO:absl:Component StatisticsGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.statistics_gen.component.StatisticsGen"
    base_type: PROCESS
  }
  id: "StatisticsGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2022-02-03T11:10:00.469382"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.StatisticsGen"
      }
    }
  }
}
inputs {
  inputs {
    key: "examples"
    value {
      channels {
        producer_node_query {
          id: "CsvExampleGen"
        }
        context_queries {
          type {
            name: "pipeline"
          }
          name {
            field_value {
              string_value: "my_pipeline"
            }
          }
        }
        context_queries {
          type {
            name: "pipeline_run"
          }
          name {
            field_value {
              string_value: "2022-02-03T11:10:00.469382"
            }
          }
        }
        context_queries {
          type {
            name: "node"
          }
          name {
            field_value {
              string_value: "my_pipeline.CsvExampleGen"
            }
          }
        }
        artifact_query {
          type {
            name: "Examples"
            base_type: DATASET
          }
        }
        output_key: "examples"
      }
      min_count: 1
    }
  }
}
outputs {
  outputs {
    key: "statistics"
    value {
      artifact_spec {
        type {
          name: "ExampleStatistics"
          properties {
            key: "span"
            value: INT
          }
          properties {
            key: "split_names"
            value: STRING
          }
          base_type: STATISTICS
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
      }
    }
  }
}
upstream_nodes: "CsvExampleGen"
downstream_nodes: "SchemaGen"
execution_options {
  caching_options {
  }
}

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 8
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=8, input_dict={'examples': [Artifact(artifact: id: 7
type_id: 15
uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/7"
properties {
  key: "split_names"
  value {
    string_value: "[\"train\", \"eval\"]"
  }
}
custom_properties {
  key: "file_format"
  value {
    string_value: "tfrecords_gzip"
  }
}
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522"
  }
}
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:00.469382:CsvExampleGen:examples:0"
  }
}
custom_properties {
  key: "payload_format"
  value {
    string_value: "FORMAT_TF_EXAMPLE"
  }
}
custom_properties {
  key: "span"
  value {
    int_value: 0
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
  }
}
state: LIVE
create_time_since_epoch: 1643886601629
last_update_time_since_epoch: 1643886601629
, artifact_type: id: 15
name: "Examples"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
properties {
  key: "version"
  value: INT
}
base_type: DATASET
)]}, output_dict=defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/8"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:00.469382:StatisticsGen:statistics:0"
  }
}
, artifact_type: name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
base_type: STATISTICS
)]}), exec_properties={'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/8/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/stateful_working_dir/2022-02-03T11:10:00.469382', tmp_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/8/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.statistics_gen.component.StatisticsGen"
    base_type: PROCESS
  }
  id: "StatisticsGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2022-02-03T11:10:00.469382"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.StatisticsGen"
      }
    }
  }
}
inputs {
  inputs {
    key: "examples"
    value {
      channels {
        producer_node_query {
          id: "CsvExampleGen"
        }
        context_queries {
          type {
            name: "pipeline"
          }
          name {
            field_value {
              string_value: "my_pipeline"
            }
          }
        }
        context_queries {
          type {
            name: "pipeline_run"
          }
          name {
            field_value {
              string_value: "2022-02-03T11:10:00.469382"
            }
          }
        }
        context_queries {
          type {
            name: "node"
          }
          name {
            field_value {
              string_value: "my_pipeline.CsvExampleGen"
            }
          }
        }
        artifact_query {
          type {
            name: "Examples"
            base_type: DATASET
          }
        }
        output_key: "examples"
      }
      min_count: 1
    }
  }
}
outputs {
  outputs {
    key: "statistics"
    value {
      artifact_spec {
        type {
          name: "ExampleStatistics"
          properties {
            key: "span"
            value: INT
          }
          properties {
            key: "split_names"
            value: STRING
          }
          base_type: STATISTICS
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
      }
    }
  }
}
upstream_nodes: "CsvExampleGen"
downstream_nodes: "SchemaGen"
execution_options {
  caching_options {
  }
}
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2022-02-03T11:10:00.469382')
INFO:absl:Generating statistics for split train.
INFO:absl:Statistics for split train written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/8/Split-train.
INFO:absl:Generating statistics for split eval.
INFO:absl:Statistics for split eval written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/8/Split-eval.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 8 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/8"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:00.469382:StatisticsGen:statistics:0"
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
  }
}
, artifact_type: name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
base_type: STATISTICS
)]}) for execution 8
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component StatisticsGen is finished.
INFO:absl:Component SchemaGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.schema_gen.component.SchemaGen"
    base_type: PROCESS
  }
  id: "SchemaGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2022-02-03T11:10:00.469382"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.SchemaGen"
      }
    }
  }
}
inputs {
  inputs {
    key: "statistics"
    value {
      channels {
        producer_node_query {
          id: "StatisticsGen"
        }
        context_queries {
          type {
            name: "pipeline"
          }
          name {
            field_value {
              string_value: "my_pipeline"
            }
          }
        }
        context_queries {
          type {
            name: "pipeline_run"
          }
          name {
            field_value {
              string_value: "2022-02-03T11:10:00.469382"
            }
          }
        }
        context_queries {
          type {
            name: "node"
          }
          name {
            field_value {
              string_value: "my_pipeline.StatisticsGen"
            }
          }
        }
        artifact_query {
          type {
            name: "ExampleStatistics"
            base_type: STATISTICS
          }
        }
        output_key: "statistics"
      }
      min_count: 1
    }
  }
}
outputs {
  outputs {
    key: "schema"
    value {
      artifact_spec {
        type {
          name: "Schema"
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
      }
    }
  }
  parameters {
    key: "infer_feature_shape"
    value {
      field_value {
        int_value: 1
      }
    }
  }
}
upstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
  }
}

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 9
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=9, input_dict={'statistics': [Artifact(artifact: id: 8
type_id: 17
uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/8"
properties {
  key: "split_names"
  value {
    string_value: "[\"train\", \"eval\"]"
  }
}
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:00.469382:StatisticsGen:statistics:0"
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
  }
}
state: LIVE
create_time_since_epoch: 1643886605023
last_update_time_since_epoch: 1643886605023
, artifact_type: id: 17
name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
base_type: STATISTICS
)]}, output_dict=defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/9"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:00.469382:SchemaGen:schema:0"
  }
}
, artifact_type: name: "Schema"
)]}), exec_properties={'exclude_splits': '[]', 'infer_feature_shape': 1}, execution_output_uri='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/9/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/stateful_working_dir/2022-02-03T11:10:00.469382', tmp_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/9/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.schema_gen.component.SchemaGen"
    base_type: PROCESS
  }
  id: "SchemaGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2022-02-03T11:10:00.469382"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.SchemaGen"
      }
    }
  }
}
inputs {
  inputs {
    key: "statistics"
    value {
      channels {
        producer_node_query {
          id: "StatisticsGen"
        }
        context_queries {
          type {
            name: "pipeline"
          }
          name {
            field_value {
              string_value: "my_pipeline"
            }
          }
        }
        context_queries {
          type {
            name: "pipeline_run"
          }
          name {
            field_value {
              string_value: "2022-02-03T11:10:00.469382"
            }
          }
        }
        context_queries {
          type {
            name: "node"
          }
          name {
            field_value {
              string_value: "my_pipeline.StatisticsGen"
            }
          }
        }
        artifact_query {
          type {
            name: "ExampleStatistics"
            base_type: STATISTICS
          }
        }
        output_key: "statistics"
      }
      min_count: 1
    }
  }
}
outputs {
  outputs {
    key: "schema"
    value {
      artifact_spec {
        type {
          name: "Schema"
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
      }
    }
  }
  parameters {
    key: "infer_feature_shape"
    value {
      field_value {
        int_value: 1
      }
    }
  }
}
upstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
  }
}
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2022-02-03T11:10:00.469382')
INFO:absl:Processing schema from statistics for split train.
INFO:absl:Processing schema from statistics for split eval.
INFO:absl:Schema written to ./tfx_pipeline_output/my_pipeline/SchemaGen/schema/9/schema.pbtxt.
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 9 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/9"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:00.469382:SchemaGen:schema:0"
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
  }
}
, artifact_type: name: "Schema"
)]}) for execution 9
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component SchemaGen is finished.

Quando questa esecuzione viene eseguita correttamente, ora hai creato ed eseguito la tua prima pipeline TFX per il tuo modello. Congratulazioni!

Il tuo nuovo modello si troverà in un punto nella directory di output, ma sarebbe meglio avere un modello in posizione fissa o un servizio al di fuori della pipeline TFX che contiene molti risultati temporanei. Ancora meglio con la valutazione continua del modello costruito, che è fondamentale nei sistemi di produzione ML. Vedremo come funzionano la valutazione continua e le implementazioni in TFX nel passaggio successivo.

Passaggio 5. (Facoltativo) Valuta il modello con Evaluator e pubblicalo con pusher.

Il componente Evaluator valuta continuamente ogni modello creato da Trainer e Pusher copia il modello in una posizione predefinita nel file system o persino nei modelli di piattaforma AI di Google Cloud .

Aggiunge il componente Valutatore alla pipeline.

Nel file pipeline/pipeline.py :

  1. Decommenta # components.append(model_resolver) per aggiungere l'ultimo risolutore del modello alla pipeline. Valutatore può essere utilizzato per confrontare un modello con il vecchio modello di base che ha superato Valutatore nell'ultima esecuzione della pipeline. LatestBlessedModelResolver trova l'ultimo modello che ha superato Valutatore.
  2. Imposta tfma.MetricsSpec corretto per il tuo modello. La valutazione potrebbe essere diversa per ogni modello ML. Nel modello pinguino è stato utilizzato SparseCategoricalAccuracy perché stiamo risolvendo un problema di classificazione a più categorie. È inoltre necessario specificare tfma.SliceSpec per analizzare il modello per sezioni specifiche. Per maggiori dettagli, vedere la guida ai componenti del valutatore .
  3. Decommentare # components.append(evaluator) per aggiungere il componente alla pipeline.

È possibile aggiornare la pipeline ed eseguirla di nuovo.

# Update and run the pipeline.
!tfx pipeline update --engine=local --pipeline_path=local_runner.py \
 && tfx run create --engine=local --pipeline_name={PIPELINE_NAME}
CLI
Updating pipeline
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
Pipeline "my_pipeline" updated successfully.
CLI
Creating a run for pipeline: my_pipeline
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Using deployment config:
 executor_specs {
  key: "CsvExampleGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor"
      }
    }
  }
}
executor_specs {
  key: "SchemaGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.schema_gen.executor.Executor"
    }
  }
}
executor_specs {
  key: "StatisticsGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.statistics_gen.executor.Executor"
      }
    }
  }
}
custom_driver_specs {
  key: "CsvExampleGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.example_gen.driver.FileBasedDriver"
    }
  }
}
metadata_connection_config {
  database_connection_config {
    sqlite {
      filename_uri: "./tfx_metadata/my_pipeline/metadata.db"
      connection_mode: READWRITE_OPENCREATE
    }
  }
}

INFO:absl:Using connection config:
 sqlite {
  filename_uri: "./tfx_metadata/my_pipeline/metadata.db"
  connection_mode: READWRITE_OPENCREATE
}

INFO:absl:Component CsvExampleGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
  }
  id: "CsvExampleGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2022-02-03T11:10:24.358660"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.CsvExampleGen"
      }
    }
  }
}
outputs {
  outputs {
    key: "examples"
    value {
      artifact_spec {
        type {
          name: "Examples"
          properties {
            key: "span"
            value: INT
          }
          properties {
            key: "split_names"
            value: STRING
          }
          properties {
            key: "version"
            value: INT
          }
          base_type: DATASET
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "input_base"
    value {
      field_value {
        string_value: "/home/kbuilder/imported/my_pipeline/data"
      }
    }
  }
  parameters {
    key: "input_config"
    value {
      field_value {
        string_value: "{\n  \"splits\": [\n    {\n      \"name\": \"single_split\",\n      \"pattern\": \"*\"\n    }\n  ]\n}"
      }
    }
  }
  parameters {
    key: "output_config"
    value {
      field_value {
        string_value: "{\n  \"split_config\": {\n    \"splits\": [\n      {\n        \"hash_buckets\": 2,\n        \"name\": \"train\"\n      },\n      {\n        \"hash_buckets\": 1,\n        \"name\": \"eval\"\n      }\n    ]\n  }\n}"
      }
    }
  }
  parameters {
    key: "output_data_format"
    value {
      field_value {
        int_value: 6
      }
    }
  }
  parameters {
    key: "output_file_format"
    value {
      field_value {
        int_value: 5
      }
    }
  }
}
downstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
  }
}

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:select span and version = (0, None)
INFO:absl:latest span and version = (0, None)
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 10
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=10, input_dict={}, output_dict=defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/10"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522"
  }
}
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:24.358660:CsvExampleGen:examples:0"
  }
}
custom_properties {
  key: "span"
  value {
    int_value: 0
  }
}
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
properties {
  key: "version"
  value: INT
}
base_type: DATASET
)]}), exec_properties={'output_file_format': 5, 'output_config': '{\n  "split_config": {\n    "splits": [\n      {\n        "hash_buckets": 2,\n        "name": "train"\n      },\n      {\n        "hash_buckets": 1,\n        "name": "eval"\n      }\n    ]\n  }\n}', 'output_data_format': 6, 'input_base': '/home/kbuilder/imported/my_pipeline/data', 'input_config': '{\n  "splits": [\n    {\n      "name": "single_split",\n      "pattern": "*"\n    }\n  ]\n}', 'span': 0, 'version': None, 'input_fingerprint': 'split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/10/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/stateful_working_dir/2022-02-03T11:10:24.358660', tmp_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/10/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
  }
  id: "CsvExampleGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2022-02-03T11:10:24.358660"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.CsvExampleGen"
      }
    }
  }
}
outputs {
  outputs {
    key: "examples"
    value {
      artifact_spec {
        type {
          name: "Examples"
          properties {
            key: "span"
            value: INT
          }
          properties {
            key: "split_names"
            value: STRING
          }
          properties {
            key: "version"
            value: INT
          }
          base_type: DATASET
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "input_base"
    value {
      field_value {
        string_value: "/home/kbuilder/imported/my_pipeline/data"
      }
    }
  }
  parameters {
    key: "input_config"
    value {
      field_value {
        string_value: "{\n  \"splits\": [\n    {\n      \"name\": \"single_split\",\n      \"pattern\": \"*\"\n    }\n  ]\n}"
      }
    }
  }
  parameters {
    key: "output_config"
    value {
      field_value {
        string_value: "{\n  \"split_config\": {\n    \"splits\": [\n      {\n        \"hash_buckets\": 2,\n        \"name\": \"train\"\n      },\n      {\n        \"hash_buckets\": 1,\n        \"name\": \"eval\"\n      }\n    ]\n  }\n}"
      }
    }
  }
  parameters {
    key: "output_data_format"
    value {
      field_value {
        int_value: 6
      }
    }
  }
  parameters {
    key: "output_file_format"
    value {
      field_value {
        int_value: 5
      }
    }
  }
}
downstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
  }
}
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2022-02-03T11:10:24.358660')
INFO:absl:Generating examples.
INFO:absl:Processing input csv data /home/kbuilder/imported/my_pipeline/data/* to TFExample.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
E0203 11:10:24.894390124    5584 fork_posix.cc:70]           Fork support is only compatible with the epoll1 and poll polling strategies
WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be.
INFO:absl:Examples generated.
INFO:absl:Value type <class 'NoneType'> of key version in exec_properties is not supported, going to drop it
INFO:absl:Value type <class 'list'> of key _beam_pipeline_args in exec_properties is not supported, going to drop it
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 10 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/10"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522"
  }
}
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:24.358660:CsvExampleGen:examples:0"
  }
}
custom_properties {
  key: "span"
  value {
    int_value: 0
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
  }
}
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
properties {
  key: "version"
  value: INT
}
base_type: DATASET
)]}) for execution 10
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component CsvExampleGen is finished.
INFO:absl:Component StatisticsGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.statistics_gen.component.StatisticsGen"
    base_type: PROCESS
  }
  id: "StatisticsGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2022-02-03T11:10:24.358660"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.StatisticsGen"
      }
    }
  }
}
inputs {
  inputs {
    key: "examples"
    value {
      channels {
        producer_node_query {
          id: "CsvExampleGen"
        }
        context_queries {
          type {
            name: "pipeline"
          }
          name {
            field_value {
              string_value: "my_pipeline"
            }
          }
        }
        context_queries {
          type {
            name: "pipeline_run"
          }
          name {
            field_value {
              string_value: "2022-02-03T11:10:24.358660"
            }
          }
        }
        context_queries {
          type {
            name: "node"
          }
          name {
            field_value {
              string_value: "my_pipeline.CsvExampleGen"
            }
          }
        }
        artifact_query {
          type {
            name: "Examples"
            base_type: DATASET
          }
        }
        output_key: "examples"
      }
      min_count: 1
    }
  }
}
outputs {
  outputs {
    key: "statistics"
    value {
      artifact_spec {
        type {
          name: "ExampleStatistics"
          properties {
            key: "span"
            value: INT
          }
          properties {
            key: "split_names"
            value: STRING
          }
          base_type: STATISTICS
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
      }
    }
  }
}
upstream_nodes: "CsvExampleGen"
downstream_nodes: "SchemaGen"
execution_options {
  caching_options {
  }
}

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 11
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=11, input_dict={'examples': [Artifact(artifact: id: 10
type_id: 15
uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/10"
properties {
  key: "split_names"
  value {
    string_value: "[\"train\", \"eval\"]"
  }
}
custom_properties {
  key: "file_format"
  value {
    string_value: "tfrecords_gzip"
  }
}
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522"
  }
}
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:24.358660:CsvExampleGen:examples:0"
  }
}
custom_properties {
  key: "payload_format"
  value {
    string_value: "FORMAT_TF_EXAMPLE"
  }
}
custom_properties {
  key: "span"
  value {
    int_value: 0
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
  }
}
state: LIVE
create_time_since_epoch: 1643886625515
last_update_time_since_epoch: 1643886625515
, artifact_type: id: 15
name: "Examples"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
properties {
  key: "version"
  value: INT
}
base_type: DATASET
)]}, output_dict=defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/11"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:24.358660:StatisticsGen:statistics:0"
  }
}
, artifact_type: name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
base_type: STATISTICS
)]}), exec_properties={'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/11/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/stateful_working_dir/2022-02-03T11:10:24.358660', tmp_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/11/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.statistics_gen.component.StatisticsGen"
    base_type: PROCESS
  }
  id: "StatisticsGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2022-02-03T11:10:24.358660"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.StatisticsGen"
      }
    }
  }
}
inputs {
  inputs {
    key: "examples"
    value {
      channels {
        producer_node_query {
          id: "CsvExampleGen"
        }
        context_queries {
          type {
            name: "pipeline"
          }
          name {
            field_value {
              string_value: "my_pipeline"
            }
          }
        }
        context_queries {
          type {
            name: "pipeline_run"
          }
          name {
            field_value {
              string_value: "2022-02-03T11:10:24.358660"
            }
          }
        }
        context_queries {
          type {
            name: "node"
          }
          name {
            field_value {
              string_value: "my_pipeline.CsvExampleGen"
            }
          }
        }
        artifact_query {
          type {
            name: "Examples"
            base_type: DATASET
          }
        }
        output_key: "examples"
      }
      min_count: 1
    }
  }
}
outputs {
  outputs {
    key: "statistics"
    value {
      artifact_spec {
        type {
          name: "ExampleStatistics"
          properties {
            key: "span"
            value: INT
          }
          properties {
            key: "split_names"
            value: STRING
          }
          base_type: STATISTICS
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
      }
    }
  }
}
upstream_nodes: "CsvExampleGen"
downstream_nodes: "SchemaGen"
execution_options {
  caching_options {
  }
}
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2022-02-03T11:10:24.358660')
INFO:absl:Generating statistics for split train.
INFO:absl:Statistics for split train written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/11/Split-train.
INFO:absl:Generating statistics for split eval.
INFO:absl:Statistics for split eval written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/11/Split-eval.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 11 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/11"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:24.358660:StatisticsGen:statistics:0"
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
  }
}
, artifact_type: name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
base_type: STATISTICS
)]}) for execution 11
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component StatisticsGen is finished.
INFO:absl:Component SchemaGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.schema_gen.component.SchemaGen"
    base_type: PROCESS
  }
  id: "SchemaGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2022-02-03T11:10:24.358660"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.SchemaGen"
      }
    }
  }
}
inputs {
  inputs {
    key: "statistics"
    value {
      channels {
        producer_node_query {
          id: "StatisticsGen"
        }
        context_queries {
          type {
            name: "pipeline"
          }
          name {
            field_value {
              string_value: "my_pipeline"
            }
          }
        }
        context_queries {
          type {
            name: "pipeline_run"
          }
          name {
            field_value {
              string_value: "2022-02-03T11:10:24.358660"
            }
          }
        }
        context_queries {
          type {
            name: "node"
          }
          name {
            field_value {
              string_value: "my_pipeline.StatisticsGen"
            }
          }
        }
        artifact_query {
          type {
            name: "ExampleStatistics"
            base_type: STATISTICS
          }
        }
        output_key: "statistics"
      }
      min_count: 1
    }
  }
}
outputs {
  outputs {
    key: "schema"
    value {
      artifact_spec {
        type {
          name: "Schema"
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
      }
    }
  }
  parameters {
    key: "infer_feature_shape"
    value {
      field_value {
        int_value: 1
      }
    }
  }
}
upstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
  }
}

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 12
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=12, input_dict={'statistics': [Artifact(artifact: id: 11
type_id: 17
uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/11"
properties {
  key: "split_names"
  value {
    string_value: "[\"train\", \"eval\"]"
  }
}
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:24.358660:StatisticsGen:statistics:0"
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
  }
}
state: LIVE
create_time_since_epoch: 1643886628941
last_update_time_since_epoch: 1643886628941
, artifact_type: id: 17
name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
base_type: STATISTICS
)]}, output_dict=defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/12"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:24.358660:SchemaGen:schema:0"
  }
}
, artifact_type: name: "Schema"
)]}), exec_properties={'infer_feature_shape': 1, 'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/12/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/stateful_working_dir/2022-02-03T11:10:24.358660', tmp_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/12/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.schema_gen.component.SchemaGen"
    base_type: PROCESS
  }
  id: "SchemaGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2022-02-03T11:10:24.358660"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.SchemaGen"
      }
    }
  }
}
inputs {
  inputs {
    key: "statistics"
    value {
      channels {
        producer_node_query {
          id: "StatisticsGen"
        }
        context_queries {
          type {
            name: "pipeline"
          }
          name {
            field_value {
              string_value: "my_pipeline"
            }
          }
        }
        context_queries {
          type {
            name: "pipeline_run"
          }
          name {
            field_value {
              string_value: "2022-02-03T11:10:24.358660"
            }
          }
        }
        context_queries {
          type {
            name: "node"
          }
          name {
            field_value {
              string_value: "my_pipeline.StatisticsGen"
            }
          }
        }
        artifact_query {
          type {
            name: "ExampleStatistics"
            base_type: STATISTICS
          }
        }
        output_key: "statistics"
      }
      min_count: 1
    }
  }
}
outputs {
  outputs {
    key: "schema"
    value {
      artifact_spec {
        type {
          name: "Schema"
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
      }
    }
  }
  parameters {
    key: "infer_feature_shape"
    value {
      field_value {
        int_value: 1
      }
    }
  }
}
upstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
  }
}
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2022-02-03T11:10:24.358660')
INFO:absl:Processing schema from statistics for split train.
INFO:absl:Processing schema from statistics for split eval.
INFO:absl:Schema written to ./tfx_pipeline_output/my_pipeline/SchemaGen/schema/12/schema.pbtxt.
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 12 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/12"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:24.358660:SchemaGen:schema:0"
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
  }
}
, artifact_type: name: "Schema"
)]}) for execution 12
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component SchemaGen is finished.

Esaminare l'output di Evaluator

Questo passaggio richiede l'estensione del notebook Jupyter TensorFlow Model Analysis (TFMA). Nota che la versione dell'estensione per notebook TFMA dovrebbe essere identica alla versione del pacchetto Python TFMA.

Il comando seguente installerà l'estensione del notebook TFMA dal registro NPM. Potrebbero essere necessari diversi minuti per il completamento.

# Install TFMA notebook extension.
jupyter labextension install tensorflow_model_analysis@{tfma.__version__}
usage: jupyter [-h] [--version] [--config-dir] [--data-dir] [--runtime-dir]
               [--paths] [--json] [--debug]
               [subcommand]

Jupyter: Interactive Computing

positional arguments:
  subcommand     the subcommand to launch

optional arguments:
  -h, --help     show this help message and exit
  --version      show the versions of core jupyter packages and exit
  --config-dir   show Jupyter config dir
  --data-dir     show Jupyter data dir
  --runtime-dir  show Jupyter runtime dir
  --paths        show all Jupyter paths. Add --json for machine-readable
                 format.
  --json         output paths as machine-readable json
  --debug        output debug information about paths

Available subcommands: bundlerextension console dejavu execute kernel
kernelspec migrate nbconvert nbextension notebook qtconsole run
serverextension troubleshoot trust

Jupyter command `jupyter-labextension` not found.

Se l'installazione è completata, ricaricare il browser per rendere effettiva l'estensione.

with metadata.Metadata(metadata_connection_config) as metadata_handler:
  # Search all aritfacts from the previous pipeline run.
  artifacts = get_latest_artifacts(metadata_handler.store, PIPELINE_NAME)
  model_evaluation_artifacts = find_latest_artifacts_by_type(
      metadata_handler.store, artifacts,
      standard_artifacts.ModelEvaluation.TYPE_NAME)
if model_evaluation_artifacts:
  tfma_result = tfma.load_eval_result(model_evaluation_artifacts[0].uri)
  tfma.view.render_slicing_metrics(tfma_result)

Aggiunge il componente Pusher alla pipeline.

Se il modello sembra promettente, dobbiamo pubblicarlo. Il componente Pusher può pubblicare il modello in una posizione nel filesystem o nei modelli GCP AI Platform utilizzando un executor personalizzato .

Il componente Evaluator valuta continuamente ogni modello creato da Trainer e Pusher copia il modello in una posizione predefinita nel file system o persino nei modelli di piattaforma AI di Google Cloud .

  1. In local_runner.py , imposta SERVING_MODEL_DIR su una directory da pubblicare.
  2. Nel file pipeline/pipeline.py , decommentare # components.append(pusher) per aggiungere Pusher alla pipeline.

È possibile aggiornare la pipeline ed eseguirla di nuovo.

# Update and run the pipeline.
!tfx pipeline update --engine=local --pipeline_path=local_runner.py \
 && tfx run create --engine=local --pipeline_name={PIPELINE_NAME}
CLI
Updating pipeline
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
Pipeline "my_pipeline" updated successfully.
CLI
Creating a run for pipeline: my_pipeline
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Using deployment config:
 executor_specs {
  key: "CsvExampleGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor"
      }
    }
  }
}
executor_specs {
  key: "SchemaGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.schema_gen.executor.Executor"
    }
  }
}
executor_specs {
  key: "StatisticsGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.statistics_gen.executor.Executor"
      }
    }
  }
}
custom_driver_specs {
  key: "CsvExampleGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.example_gen.driver.FileBasedDriver"
    }
  }
}
metadata_connection_config {
  database_connection_config {
    sqlite {
      filename_uri: "./tfx_metadata/my_pipeline/metadata.db"
      connection_mode: READWRITE_OPENCREATE
    }
  }
}

INFO:absl:Using connection config:
 sqlite {
  filename_uri: "./tfx_metadata/my_pipeline/metadata.db"
  connection_mode: READWRITE_OPENCREATE
}

INFO:absl:Component CsvExampleGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
  }
  id: "CsvExampleGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2022-02-03T11:10:48.556314"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.CsvExampleGen"
      }
    }
  }
}
outputs {
  outputs {
    key: "examples"
    value {
      artifact_spec {
        type {
          name: "Examples"
          properties {
            key: "span"
            value: INT
          }
          properties {
            key: "split_names"
            value: STRING
          }
          properties {
            key: "version"
            value: INT
          }
          base_type: DATASET
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "input_base"
    value {
      field_value {
        string_value: "/home/kbuilder/imported/my_pipeline/data"
      }
    }
  }
  parameters {
    key: "input_config"
    value {
      field_value {
        string_value: "{\n  \"splits\": [\n    {\n      \"name\": \"single_split\",\n      \"pattern\": \"*\"\n    }\n  ]\n}"
      }
    }
  }
  parameters {
    key: "output_config"
    value {
      field_value {
        string_value: "{\n  \"split_config\": {\n    \"splits\": [\n      {\n        \"hash_buckets\": 2,\n        \"name\": \"train\"\n      },\n      {\n        \"hash_buckets\": 1,\n        \"name\": \"eval\"\n      }\n    ]\n  }\n}"
      }
    }
  }
  parameters {
    key: "output_data_format"
    value {
      field_value {
        int_value: 6
      }
    }
  }
  parameters {
    key: "output_file_format"
    value {
      field_value {
        int_value: 5
      }
    }
  }
}
downstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
  }
}

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:select span and version = (0, None)
INFO:absl:latest span and version = (0, None)
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 13
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=13, input_dict={}, output_dict=defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/13"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522"
  }
}
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:48.556314:CsvExampleGen:examples:0"
  }
}
custom_properties {
  key: "span"
  value {
    int_value: 0
  }
}
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
properties {
  key: "version"
  value: INT
}
base_type: DATASET
)]}), exec_properties={'input_config': '{\n  "splits": [\n    {\n      "name": "single_split",\n      "pattern": "*"\n    }\n  ]\n}', 'output_config': '{\n  "split_config": {\n    "splits": [\n      {\n        "hash_buckets": 2,\n        "name": "train"\n      },\n      {\n        "hash_buckets": 1,\n        "name": "eval"\n      }\n    ]\n  }\n}', 'output_file_format': 5, 'output_data_format': 6, 'input_base': '/home/kbuilder/imported/my_pipeline/data', 'span': 0, 'version': None, 'input_fingerprint': 'split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/13/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/stateful_working_dir/2022-02-03T11:10:48.556314', tmp_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/13/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
  }
  id: "CsvExampleGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2022-02-03T11:10:48.556314"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.CsvExampleGen"
      }
    }
  }
}
outputs {
  outputs {
    key: "examples"
    value {
      artifact_spec {
        type {
          name: "Examples"
          properties {
            key: "span"
            value: INT
          }
          properties {
            key: "split_names"
            value: STRING
          }
          properties {
            key: "version"
            value: INT
          }
          base_type: DATASET
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "input_base"
    value {
      field_value {
        string_value: "/home/kbuilder/imported/my_pipeline/data"
      }
    }
  }
  parameters {
    key: "input_config"
    value {
      field_value {
        string_value: "{\n  \"splits\": [\n    {\n      \"name\": \"single_split\",\n      \"pattern\": \"*\"\n    }\n  ]\n}"
      }
    }
  }
  parameters {
    key: "output_config"
    value {
      field_value {
        string_value: "{\n  \"split_config\": {\n    \"splits\": [\n      {\n        \"hash_buckets\": 2,\n        \"name\": \"train\"\n      },\n      {\n        \"hash_buckets\": 1,\n        \"name\": \"eval\"\n      }\n    ]\n  }\n}"
      }
    }
  }
  parameters {
    key: "output_data_format"
    value {
      field_value {
        int_value: 6
      }
    }
  }
  parameters {
    key: "output_file_format"
    value {
      field_value {
        int_value: 5
      }
    }
  }
}
downstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
  }
}
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2022-02-03T11:10:48.556314')
INFO:absl:Generating examples.
INFO:absl:Processing input csv data /home/kbuilder/imported/my_pipeline/data/* to TFExample.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
E0203 11:10:49.163841363    5734 fork_posix.cc:70]           Fork support is only compatible with the epoll1 and poll polling strategies
WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be.
INFO:absl:Examples generated.
INFO:absl:Value type <class 'NoneType'> of key version in exec_properties is not supported, going to drop it
INFO:absl:Value type <class 'list'> of key _beam_pipeline_args in exec_properties is not supported, going to drop it
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 13 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/13"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522"
  }
}
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:48.556314:CsvExampleGen:examples:0"
  }
}
custom_properties {
  key: "span"
  value {
    int_value: 0
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
  }
}
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
properties {
  key: "version"
  value: INT
}
base_type: DATASET
)]}) for execution 13
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component CsvExampleGen is finished.
INFO:absl:Component StatisticsGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.statistics_gen.component.StatisticsGen"
    base_type: PROCESS
  }
  id: "StatisticsGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2022-02-03T11:10:48.556314"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.StatisticsGen"
      }
    }
  }
}
inputs {
  inputs {
    key: "examples"
    value {
      channels {
        producer_node_query {
          id: "CsvExampleGen"
        }
        context_queries {
          type {
            name: "pipeline"
          }
          name {
            field_value {
              string_value: "my_pipeline"
            }
          }
        }
        context_queries {
          type {
            name: "pipeline_run"
          }
          name {
            field_value {
              string_value: "2022-02-03T11:10:48.556314"
            }
          }
        }
        context_queries {
          type {
            name: "node"
          }
          name {
            field_value {
              string_value: "my_pipeline.CsvExampleGen"
            }
          }
        }
        artifact_query {
          type {
            name: "Examples"
            base_type: DATASET
          }
        }
        output_key: "examples"
      }
      min_count: 1
    }
  }
}
outputs {
  outputs {
    key: "statistics"
    value {
      artifact_spec {
        type {
          name: "ExampleStatistics"
          properties {
            key: "span"
            value: INT
          }
          properties {
            key: "split_names"
            value: STRING
          }
          base_type: STATISTICS
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
      }
    }
  }
}
upstream_nodes: "CsvExampleGen"
downstream_nodes: "SchemaGen"
execution_options {
  caching_options {
  }
}

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 14
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=14, input_dict={'examples': [Artifact(artifact: id: 13
type_id: 15
uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/13"
properties {
  key: "split_names"
  value {
    string_value: "[\"train\", \"eval\"]"
  }
}
custom_properties {
  key: "file_format"
  value {
    string_value: "tfrecords_gzip"
  }
}
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522"
  }
}
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:48.556314:CsvExampleGen:examples:0"
  }
}
custom_properties {
  key: "payload_format"
  value {
    string_value: "FORMAT_TF_EXAMPLE"
  }
}
custom_properties {
  key: "span"
  value {
    int_value: 0
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
  }
}
state: LIVE
create_time_since_epoch: 1643886649739
last_update_time_since_epoch: 1643886649739
, artifact_type: id: 15
name: "Examples"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
properties {
  key: "version"
  value: INT
}
base_type: DATASET
)]}, output_dict=defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/14"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:48.556314:StatisticsGen:statistics:0"
  }
}
, artifact_type: name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
base_type: STATISTICS
)]}), exec_properties={'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/14/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/stateful_working_dir/2022-02-03T11:10:48.556314', tmp_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/14/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.statistics_gen.component.StatisticsGen"
    base_type: PROCESS
  }
  id: "StatisticsGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2022-02-03T11:10:48.556314"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.StatisticsGen"
      }
    }
  }
}
inputs {
  inputs {
    key: "examples"
    value {
      channels {
        producer_node_query {
          id: "CsvExampleGen"
        }
        context_queries {
          type {
            name: "pipeline"
          }
          name {
            field_value {
              string_value: "my_pipeline"
            }
          }
        }
        context_queries {
          type {
            name: "pipeline_run"
          }
          name {
            field_value {
              string_value: "2022-02-03T11:10:48.556314"
            }
          }
        }
        context_queries {
          type {
            name: "node"
          }
          name {
            field_value {
              string_value: "my_pipeline.CsvExampleGen"
            }
          }
        }
        artifact_query {
          type {
            name: "Examples"
            base_type: DATASET
          }
        }
        output_key: "examples"
      }
      min_count: 1
    }
  }
}
outputs {
  outputs {
    key: "statistics"
    value {
      artifact_spec {
        type {
          name: "ExampleStatistics"
          properties {
            key: "span"
            value: INT
          }
          properties {
            key: "split_names"
            value: STRING
          }
          base_type: STATISTICS
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
      }
    }
  }
}
upstream_nodes: "CsvExampleGen"
downstream_nodes: "SchemaGen"
execution_options {
  caching_options {
  }
}
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2022-02-03T11:10:48.556314')
INFO:absl:Generating statistics for split train.
INFO:absl:Statistics for split train written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/14/Split-train.
INFO:absl:Generating statistics for split eval.
INFO:absl:Statistics for split eval written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/14/Split-eval.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 14 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/14"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:48.556314:StatisticsGen:statistics:0"
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
  }
}
, artifact_type: name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
base_type: STATISTICS
)]}) for execution 14
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component StatisticsGen is finished.
INFO:absl:Component SchemaGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.schema_gen.component.SchemaGen"
    base_type: PROCESS
  }
  id: "SchemaGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2022-02-03T11:10:48.556314"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.SchemaGen"
      }
    }
  }
}
inputs {
  inputs {
    key: "statistics"
    value {
      channels {
        producer_node_query {
          id: "StatisticsGen"
        }
        context_queries {
          type {
            name: "pipeline"
          }
          name {
            field_value {
              string_value: "my_pipeline"
            }
          }
        }
        context_queries {
          type {
            name: "pipeline_run"
          }
          name {
            field_value {
              string_value: "2022-02-03T11:10:48.556314"
            }
          }
        }
        context_queries {
          type {
            name: "node"
          }
          name {
            field_value {
              string_value: "my_pipeline.StatisticsGen"
            }
          }
        }
        artifact_query {
          type {
            name: "ExampleStatistics"
            base_type: STATISTICS
          }
        }
        output_key: "statistics"
      }
      min_count: 1
    }
  }
}
outputs {
  outputs {
    key: "schema"
    value {
      artifact_spec {
        type {
          name: "Schema"
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
      }
    }
  }
  parameters {
    key: "infer_feature_shape"
    value {
      field_value {
        int_value: 1
      }
    }
  }
}
upstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
  }
}

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 15
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=15, input_dict={'statistics': [Artifact(artifact: id: 14
type_id: 17
uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/14"
properties {
  key: "split_names"
  value {
    string_value: "[\"train\", \"eval\"]"
  }
}
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:48.556314:StatisticsGen:statistics:0"
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
  }
}
state: LIVE
create_time_since_epoch: 1643886653128
last_update_time_since_epoch: 1643886653128
, artifact_type: id: 17
name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
base_type: STATISTICS
)]}, output_dict=defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/15"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:48.556314:SchemaGen:schema:0"
  }
}
, artifact_type: name: "Schema"
)]}), exec_properties={'infer_feature_shape': 1, 'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/15/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/stateful_working_dir/2022-02-03T11:10:48.556314', tmp_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/15/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.schema_gen.component.SchemaGen"
    base_type: PROCESS
  }
  id: "SchemaGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2022-02-03T11:10:48.556314"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.SchemaGen"
      }
    }
  }
}
inputs {
  inputs {
    key: "statistics"
    value {
      channels {
        producer_node_query {
          id: "StatisticsGen"
        }
        context_queries {
          type {
            name: "pipeline"
          }
          name {
            field_value {
              string_value: "my_pipeline"
            }
          }
        }
        context_queries {
          type {
            name: "pipeline_run"
          }
          name {
            field_value {
              string_value: "2022-02-03T11:10:48.556314"
            }
          }
        }
        context_queries {
          type {
            name: "node"
          }
          name {
            field_value {
              string_value: "my_pipeline.StatisticsGen"
            }
          }
        }
        artifact_query {
          type {
            name: "ExampleStatistics"
            base_type: STATISTICS
          }
        }
        output_key: "statistics"
      }
      min_count: 1
    }
  }
}
outputs {
  outputs {
    key: "schema"
    value {
      artifact_spec {
        type {
          name: "Schema"
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
      }
    }
  }
  parameters {
    key: "infer_feature_shape"
    value {
      field_value {
        int_value: 1
      }
    }
  }
}
upstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
  }
}
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2022-02-03T11:10:48.556314')
INFO:absl:Processing schema from statistics for split train.
INFO:absl:Processing schema from statistics for split eval.
INFO:absl:Schema written to ./tfx_pipeline_output/my_pipeline/SchemaGen/schema/15/schema.pbtxt.
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 15 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/15"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2022-02-03T11:10:48.556314:SchemaGen:schema:0"
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.6.0"
  }
}
, artifact_type: name: "Schema"
)]}) for execution 15
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component SchemaGen is finished.

Dovresti riuscire a trovare il tuo nuovo modello su SERVING_MODEL_DIR .

Passaggio 6. (Facoltativo) Distribuisci la pipeline a Kubeflow Pipelines su GCP.

Come accennato in precedenza, local_runner.py è utile per scopi di debug o sviluppo, ma non è la soluzione migliore per i carichi di lavoro di produzione. In questo passaggio, implementeremo la pipeline in Kubeflow Pipelines su Google Cloud.

Preparazione

Abbiamo bisogno del pacchetto python kfp e del programma skaffold per distribuire una pipeline in un cluster Kubeflow Pipelines.

pip install --upgrade -q kfp

# Download skaffold and set it executable.
curl -Lo skaffold https://storage.googleapis.com/skaffold/releases/latest/skaffold-linux-amd64 && chmod +x skaffold

Devi spostare il binario skaffold nel punto in cui la tua shell può trovarlo. Oppure puoi specificare il percorso di skaffold quando esegui tfx binary con --skaffold-cmd flag.

# Move skaffold binary into your path
mv skaffold /home/jupyter/.local/bin/
mv: cannot move 'skaffold' to '/home/jupyter/.local/bin/': No such file or directory

È inoltre necessario un cluster Kubeflow Pipelines per eseguire la pipeline. Segui i passaggi 1 e 2 in TFX on Cloud AI Platform Pipelines tutorial .

Quando il tuo cluster è pronto, apri il dashboard della pipeline facendo clic su Apri dashboard Pipelines nella pagina Pipelines della console cloud di Google . L'URL di questa pagina è ENDPOINT per richiedere un'esecuzione della pipeline. Il valore dell'endpoint è tutto nell'URL dopo https://, fino a googleusercontent.com incluso. Metti il ​​tuo endpoint nel seguente blocco di codice.

ENDPOINT='' # Enter your ENDPOINT here.

Per eseguire il nostro codice in un cluster Kubeflow Pipelines, dobbiamo comprimere il nostro codice in un'immagine del contenitore. L'immagine verrà creata automaticamente durante la distribuzione della nostra pipeline e devi solo impostare un nome e un registro contenitori per la tua immagine. Nel nostro esempio, utilizzeremo Google Container Registry e lo tfx-pipeline .

# Read GCP project id from env.
shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
GOOGLE_CLOUD_PROJECT=shell_output[0]

# Docker image name for the pipeline image.
CUSTOM_TFX_IMAGE='gcr.io/' + GOOGLE_CLOUD_PROJECT + '/tfx-pipeline'

Imposta la posizione dei dati.

I tuoi dati dovrebbero essere accessibili dal cluster Kubeflow Pipelines. Se hai utilizzato i dati nel tuo ambiente locale, potrebbe essere necessario caricarli su un archivio remoto come Google Cloud Storage. Ad esempio, possiamo caricare i dati del pinguino in un bucket predefinito che viene creato automaticamente quando un cluster Kubeflow Pipelines viene distribuito come segue.

gsutil cp data/data.csv gs://{GOOGLE_CLOUD_PROJECT}-kubeflowpipelines-default/tfx-template/data/penguin/
Copying file://data/data.csv [Content-Type=text/csv]...
NotFoundException: 404 The destination bucket gs://tf-benchmark-dashboard-kubeflowpipelines-default does not exist or the write to the destination must be restarted

Aggiorna la posizione dei dati archiviata in DATA_PATH in kubeflow_runner.py .

Se utilizzi BigQueryExampleGen, non è necessario caricare il file di dati, ma assicurati che kubeflow_runner.py utilizzi la stessa query e l'argomento beam_pipeline_args per la funzione pipeline.create_pipeline() .

Distribuire la pipeline.

Se tutto è pronto, puoi creare una pipeline usando il comando tfx pipeline create .

!tfx pipeline create  \
--engine=kubeflow \
--pipeline-path=kubeflow_runner.py \
--endpoint={ENDPOINT} \
--build-target-image={CUSTOM_TFX_IMAGE}
CLI
[Error] --build-target-image flag was DELETED. You should specify the build target image at the `KubeflowDagRunnerConfig` class instead, and use --build-image flag without argument to build a container image when creating or updating a pipeline.

Ora avvia un'esecuzione con la pipeline appena creata utilizzando il comando tfx run create .

tfx run create --engine=kubeflow --pipeline-name={PIPELINE_NAME} --endpoint={ENDPOINT}
CLI
Creating a run for pipeline: my_pipeline
Failed to load kube config.
Traceback (most recent call last):
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connection.py", line 175, in _new_conn
    (self._dns_host, self.port), self.timeout, **extra_kw
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/util/connection.py", line 95, in create_connection
    raise err
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/util/connection.py", line 85, in create_connection
    sock.connect(sa)
ConnectionRefusedError: [Errno 111] Connection refused

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connectionpool.py", line 710, in urlopen
    chunked=chunked,
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connectionpool.py", line 398, in _make_request
    conn.request(method, url, **httplib_request_kw)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connection.py", line 239, in request
    super(HTTPConnection, self).request(method, url, body=body, headers=headers)
  File "/usr/lib/python3.7/http/client.py", line 1256, in request
    self._send_request(method, url, body, headers, encode_chunked)
  File "/usr/lib/python3.7/http/client.py", line 1302, in _send_request
    self.endheaders(body, encode_chunked=encode_chunked)
  File "/usr/lib/python3.7/http/client.py", line 1251, in endheaders
    self._send_output(message_body, encode_chunked=encode_chunked)
  File "/usr/lib/python3.7/http/client.py", line 1030, in _send_output
    self.send(msg)
  File "/usr/lib/python3.7/http/client.py", line 970, in send
    self.connect()
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connection.py", line 205, in connect
    conn = self._new_conn()
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connection.py", line 187, in _new_conn
    self, "Failed to establish a new connection: %s" % e
urllib3.exceptions.NewConnectionError: <urllib3.connection.HTTPConnection object at 0x7ff729e34190>: Failed to establish a new connection: [Errno 111] Connection refused

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/tmpfs/src/tf_docs_env/bin/tfx", line 8, in <module>
    sys.exit(cli_group())
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 829, in __call__
    return self.main(*args, **kwargs)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 782, in main
    rv = self.invoke(ctx)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 1066, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/decorators.py", line 73, in new_func
    return ctx.invoke(f, obj, *args, **kwargs)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tfx/tools/cli/commands/run.py", line 94, in create_run
    handler = handler_factory.create_handler(ctx.flags_dict)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tfx/tools/cli/handler/handler_factory.py", line 93, in create_handler
    return kubeflow_handler.KubeflowHandler(flags_dict)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tfx/tools/cli/handler/kubeflow_handler.py", line 62, in __init__
    namespace=self.flags_dict[labels.NAMESPACE])
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp/_client.py", line 197, in __init__
    if not self._context_setting['namespace'] and self.get_kfp_healthz(
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp/_client.py", line 411, in get_kfp_healthz
    response = self._healthz_api.get_healthz()
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp_server_api/api/healthz_service_api.py", line 63, in get_healthz
    return self.get_healthz_with_http_info(**kwargs)  # noqa: E501
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp_server_api/api/healthz_service_api.py", line 148, in get_healthz_with_http_info
    collection_formats=collection_formats)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp_server_api/api_client.py", line 369, in call_api
    _preload_content, _request_timeout, _host)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp_server_api/api_client.py", line 185, in __call_api
    _request_timeout=_request_timeout)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp_server_api/api_client.py", line 393, in request
    headers=headers)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp_server_api/rest.py", line 234, in GET
    query_params=query_params)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp_server_api/rest.py", line 212, in request
    headers=headers)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/request.py", line 75, in request
    method, url, fields=fields, headers=headers, **urlopen_kw
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/request.py", line 96, in request_encode_url
    return self.urlopen(method, url, **extra_kw)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/poolmanager.py", line 375, in urlopen
    response = conn.urlopen(method, u.request_uri, **kw)
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connectionpool.py", line 826, in urlopen
    **response_kw
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connectionpool.py", line 826, in urlopen
    **response_kw
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connectionpool.py", line 826, in urlopen
    **response_kw
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connectionpool.py", line 786, in urlopen
    method, url, error=e, _pool=self, _stacktrace=sys.exc_info()[2]
  File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/util/retry.py", line 592, in increment
    raise MaxRetryError(_pool, url, error or ResponseError(cause))
urllib3.exceptions.MaxRetryError: HTTPConnectionPool(host='localhost', port=80): Max retries exceeded with url: /apis/v1beta1/healthz (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7ff729e34190>: Failed to establish a new connection: [Errno 111] Connection refused'))

Oppure puoi anche eseguire la pipeline nel dashboard di Kubeflow Pipelines. La nuova corsa sarà elencata in Experiments nel dashboard di Kubeflow Pipelines. Facendo clic sull'esperimento sarà possibile monitorare l'avanzamento e visualizzare gli artefatti creati durante l'esecuzione.

Se sei interessato a eseguire la tua pipeline su Kubeflow Pipelines, trova ulteriori istruzioni nel tutorial TFX on Cloud AI Platform Pipelines .

Pulire

Per ripulire tutte le risorse Google Cloud utilizzate in questo passaggio, puoi eliminare il progetto Google Cloud che hai utilizzato per il tutorial.

In alternativa, puoi ripulire le singole risorse visitando ciascuna console: