Introdução
Este documento fornecerá instruções para criar um pipeline TensorFlow Extended (TFX) para seu próprio conjunto de dados usando o modelo de pinguim fornecido com o pacote TFX Python. O pipeline criado usará inicialmente o conjunto de dados do Palmer Penguins , mas transformaremos o pipeline do seu conjunto de dados.
Pré-requisitos
- Linux/Mac OS
- Python 3.6-3.8
- Notebook Jupyter
Etapa 1. Copie o modelo predefinido para o diretório do projeto.
Nesta etapa, criaremos um diretório e arquivos de projeto de pipeline de trabalho copiando arquivos do modelo de pinguim no TFX. Você pode pensar nisso como um andaime para seu projeto de pipeline TFX.
Atualizar Pip
Se estivermos executando no Colab, devemos nos certificar de que temos a versão mais recente do Pip. É claro que os sistemas locais podem ser atualizados separadamente.
import sys
if 'google.colab' in sys.modules:
!pip install --upgrade pip
Instale o pacote necessário
Primeiro, instale o TFX e o TensorFlow Model Analysis (TFMA).
pip install -U tfx tensorflow-model-analysis
Vamos verificar as versões do TFX.
import tensorflow as tf
import tensorflow_model_analysis as tfma
import tfx
print('TF version: {}'.format(tf.__version__))
print('TFMA version: {}'.format(tfma.__version__))
print('TFX version: {}'.format(tfx.__version__))
TF version: 2.7.1 TFMA version: 0.37.0 TFX version: 1.6.0
Estamos prontos para criar um pipeline.
Defina PROJECT_DIR
para o destino apropriado para seu ambiente. O valor padrão é ~/imported/${PIPELINE_NAME}
, que é apropriado para o ambiente do Google Cloud AI Platform Notebook .
Você pode dar um nome diferente ao seu pipeline alterando o PIPELINE_NAME
abaixo. Este também se tornará o nome do diretório do projeto onde seus arquivos serão colocados.
PIPELINE_NAME="my_pipeline"
import os
# Set this project directory to your new tfx pipeline project.
PROJECT_DIR=os.path.join(os.path.expanduser("~"), "imported", PIPELINE_NAME)
Copie os arquivos de modelo.
O TFX inclui o modelo penguin
com o pacote TFX python. O modelo de penguin
contém muitas instruções para trazer seu conjunto de dados para o pipeline, que é o objetivo deste tutorial.
O comando da CLI tfx template copy
copia os arquivos de modelo predefinidos para o diretório do projeto.
# Set `PATH` to include user python binary directory and a directory containing `skaffold`.
PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin
!tfx template copy \
--pipeline-name={PIPELINE_NAME} \
--destination-path={PROJECT_DIR} \
--model=penguin
env: PATH=/tmpfs/src/tf_docs_env/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/snap/bin:/opt/puppetlabs/bin:/opt/android-studio/current/bin:/usr/local/go/bin:/usr/local/go/packages/bin:/opt/kubernetes/client/bin/:/home/kbuilder/.local/bin:/home/jupyter/.local/bin CLI Copying penguin pipeline template kubeflow_runner.py -> /home/kbuilder/imported/my_pipeline/kubeflow_runner.py configs.py -> /home/kbuilder/imported/my_pipeline/pipeline/configs.py pipeline.py -> /home/kbuilder/imported/my_pipeline/pipeline/pipeline.py __init__.py -> /home/kbuilder/imported/my_pipeline/pipeline/__init__.py model.py -> /home/kbuilder/imported/my_pipeline/models/model.py features.py -> /home/kbuilder/imported/my_pipeline/models/features.py features_test.py -> /home/kbuilder/imported/my_pipeline/models/features_test.py preprocessing_test.py -> /home/kbuilder/imported/my_pipeline/models/preprocessing_test.py preprocessing.py -> /home/kbuilder/imported/my_pipeline/models/preprocessing.py model_test.py -> /home/kbuilder/imported/my_pipeline/models/model_test.py __init__.py -> /home/kbuilder/imported/my_pipeline/models/__init__.py constants.py -> /home/kbuilder/imported/my_pipeline/models/constants.py local_runner.py -> /home/kbuilder/imported/my_pipeline/local_runner.py __init__.py -> /home/kbuilder/imported/my_pipeline/__init__.py
Altere o contexto do diretório de trabalho neste notebook para o diretório do projeto.
%cd {PROJECT_DIR}
/home/kbuilder/imported/my_pipeline
Procure seus arquivos de origem copiados
O modelo TFX fornece arquivos scaffold básicos para criar um pipeline, incluindo código-fonte Python e dados de exemplo. O modelo de penguin
usa o mesmo conjunto de dados e modelo de ML do Palmer Penguins do exemplo do Pinguim .
Aqui está uma breve introdução a cada um dos arquivos Python.
-
pipeline
- Este diretório contém a definição do pipeline-
configs.py
— define constantes comuns para executores de pipeline -
pipeline.py
— define componentes TFX e um pipeline
-
-
models
- Este diretório contém definições de modelos de ML-
features.py
,features_test.py
— define recursos para o modelo -
preprocessing.py
,preprocessing_test.py
— define rotinas de pré-processamento para dados -
constants.py
— define as constantes do modelo -
model.py
,model_test.py
— define o modelo de ML usando estruturas de ML como o TensorFlow
-
-
local_runner.py
— define um executor para o ambiente local que usa o mecanismo de orquestração local -
kubeflow_runner.py
— define um executor para o mecanismo de orquestração do Kubeflow Pipelines
Por padrão, o modelo inclui apenas componentes TFX padrão. Se você precisar de algumas ações personalizadas, poderá criar componentes personalizados para seu pipeline. Consulte o guia de componentes personalizados do TFX para obter detalhes.
Arquivos de teste de unidade.
Você pode notar que existem alguns arquivos com _test.py
em seu nome. Esses são testes de unidade do pipeline e é recomendável adicionar mais testes de unidade à medida que você implementa seus próprios pipelines. Você pode executar testes de unidade fornecendo o nome do módulo dos arquivos de teste com o sinalizador -m
. Geralmente, você pode obter um nome de módulo excluindo a extensão .py
e substituindo /
por .
. Por exemplo:
import sys
!{sys.executable} -m models.features_test
Running tests under Python 3.7.5: /tmpfs/src/tf_docs_env/bin/python [ RUN ] FeaturesTest.testLabelKey INFO:tensorflow:time(__main__.FeaturesTest.testLabelKey): 0.0s I0203 11:08:46.306882 140258321348416 test_util.py:2309] time(__main__.FeaturesTest.testLabelKey): 0.0s [ OK ] FeaturesTest.testLabelKey [ RUN ] FeaturesTest.test_session [ SKIPPED ] FeaturesTest.test_session ---------------------------------------------------------------------- Ran 2 tests in 0.001s OK (skipped=1)
Crie um pipeline TFX no ambiente local.
O TFX oferece suporte a vários mecanismos de orquestração para executar pipelines. Usaremos o mecanismo de orquestração local. O mecanismo de orquestração local é executado sem outras dependências e é adequado para desenvolvimento e depuração porque é executado em ambiente local em vez de depender de clusters de computação remotos.
Usaremos local_runner.py
para executar seu pipeline usando o orquestrador local. Você precisa criar um pipeline antes de executá-lo. Você pode criar um pipeline com o comando pipeline create
.
tfx pipeline create --engine=local --pipeline_path=local_runner.py
CLI Creating pipeline INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Excluding no splits because exclude_splits is not set. Pipeline "my_pipeline" created successfully.
O comando pipeline create
registra seu pipeline definido em local_runner.py
sem realmente executá-lo.
Você executará o pipeline criado com o comando run create
nas etapas a seguir.
Etapa 2. Faça a ingestão de SEUS dados para o pipeline.
O pipeline inicial ingere o conjunto de dados do pinguim que está incluído no modelo. Você precisa colocar seus dados no pipeline, e a maioria dos pipelines do TFX começa com o componente ExampleGen.
Escolha um ExemploGen
Seus dados podem ser armazenados em qualquer lugar que seu pipeline possa acessar, em um sistema de arquivos local ou distribuído ou em um sistema que possa ser consultado. O TFX fornece vários componentes ExampleGen
para trazer seus dados para um pipeline do TFX. Você pode escolher um dos seguintes componentes de geração de exemplo.
- CsvExampleGen: Lê arquivos CSV em um diretório. Usado no exemplo do pinguim e no exemplo do táxi de Chicago .
- ImportExampleGen: Recebe arquivos TFRecord com o formato de dados TF Example. Usado em exemplos MNIST .
- FileBasedExampleGen para formato Avro ou Parquet .
- BigQueryExampleGen : lê dados diretamente no Google Cloud BigQuery. Usado em exemplos de táxi de Chicago .
Você também pode criar seu próprio ExampleGen, por exemplo, tfx inclui um ExecampleGen personalizado que usa Presto como fonte de dados. Consulte o guia para obter mais informações sobre como usar e desenvolver executores personalizados.
Depois de decidir qual ExampleGen usar, você precisará modificar a definição do pipeline para usar seus dados.
Modifique o
DATA_PATH
emlocal_runner.py
e configure-o para o local de seus arquivos.- Se você tiver arquivos em ambiente local, especifique o caminho. Esta é a melhor opção para desenvolver ou depurar um pipeline.
- Se os arquivos estiverem armazenados no GCS, você poderá usar um caminho começando com
gs://{bucket_name}/...
. Verifique se você pode acessar o GCS de seu terminal, por exemplo, usandogsutil
. Siga o guia de autorização no Google Cloud, se necessário. - Se você quiser usar um ExampleGen baseado em consulta, como BigQueryExampleGen, precisará de uma instrução de consulta para selecionar dados da fonte de dados. Há mais algumas coisas que você precisa definir para usar o Google Cloud BigQuery como fonte de dados.
- Em
pipeline/configs.py
:- Altere
GOOGLE_CLOUD_PROJECT
eGCS_BUCKET_NAME
para seu projeto do GCP e nome do bucket. O bucket deve existir antes de executarmos o pipeline. - Descomente a variável
BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS
. - Remova o comentário e defina a variável
BIG_QUERY_QUERY
para sua instrução de consulta .
- Altere
- Em
local_runner.py
:- Comente o argumento
data_path
e descomente o argumento daquery
empipeline.create_pipeline()
.
- Comente o argumento
- Em
pipeline/pipeline.py
:- Comente o argumento
data_path
e descomente o argumento daquery
emcreate_pipeline()
. - Use BigQueryExampleGen em vez de CsvExampleGen.
- Comente o argumento
Substitua CsvExampleGen existente para sua classe ExampleGen em
pipeline/pipeline.py
. Cada classe ExampleGen tem uma assinatura diferente. Consulte o guia do componente ExampleGen para obter mais detalhes. Não se esqueça de importar os módulos necessários com instruções deimport
empipeline/pipeline.py
.
O pipeline inicial consiste em quatro componentes, ExampleGen
, StatisticsGen
, SchemaGen
e ExampleValidator
. Não precisamos alterar nada para StatisticsGen
, SchemaGen
e ExampleValidator
. Vamos executar o pipeline pela primeira vez.
# Update and run the pipeline.
!tfx pipeline update --engine=local --pipeline_path=local_runner.py \
&& tfx run create --engine=local --pipeline_name={PIPELINE_NAME}
CLI Updating pipeline INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Excluding no splits because exclude_splits is not set. Pipeline "my_pipeline" updated successfully. CLI Creating a run for pipeline: my_pipeline INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Using deployment config: executor_specs { key: "CsvExampleGen" value { beam_executable_spec { python_executor_spec { class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor" } } } } executor_specs { key: "SchemaGen" value { python_class_executable_spec { class_path: "tfx.components.schema_gen.executor.Executor" } } } executor_specs { key: "StatisticsGen" value { beam_executable_spec { python_executor_spec { class_path: "tfx.components.statistics_gen.executor.Executor" } } } } custom_driver_specs { key: "CsvExampleGen" value { python_class_executable_spec { class_path: "tfx.components.example_gen.driver.FileBasedDriver" } } } metadata_connection_config { database_connection_config { sqlite { filename_uri: "./tfx_metadata/my_pipeline/metadata.db" connection_mode: READWRITE_OPENCREATE } } } INFO:absl:Using connection config: sqlite { filename_uri: "./tfx_metadata/my_pipeline/metadata.db" connection_mode: READWRITE_OPENCREATE } INFO:absl:Component CsvExampleGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen" } id: "CsvExampleGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:12.120566" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } } outputs { outputs { key: "examples" value { artifact_spec { type { name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET } } } } } parameters { parameters { key: "input_base" value { field_value { string_value: "/home/kbuilder/imported/my_pipeline/data" } } } parameters { key: "input_config" value { field_value { string_value: "{\n \"splits\": [\n {\n \"name\": \"single_split\",\n \"pattern\": \"*\"\n }\n ]\n}" } } } parameters { key: "output_config" value { field_value { string_value: "{\n \"split_config\": {\n \"splits\": [\n {\n \"hash_buckets\": 2,\n \"name\": \"train\"\n },\n {\n \"hash_buckets\": 1,\n \"name\": \"eval\"\n }\n ]\n }\n}" } } } parameters { key: "output_data_format" value { field_value { int_value: 6 } } } parameters { key: "output_file_format" value { field_value { int_value: 5 } } } } downstream_nodes: "StatisticsGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized INFO:absl:select span and version = (0, None) INFO:absl:latest span and version = (0, None) INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 1 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=1, input_dict={}, output_dict=defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/1" custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:09:12.120566:CsvExampleGen:examples:0" } } custom_properties { key: "span" value { int_value: 0 } } , artifact_type: name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}), exec_properties={'input_config': '{\n "splits": [\n {\n "name": "single_split",\n "pattern": "*"\n }\n ]\n}', 'output_data_format': 6, 'output_file_format': 5, 'output_config': '{\n "split_config": {\n "splits": [\n {\n "hash_buckets": 2,\n "name": "train"\n },\n {\n "hash_buckets": 1,\n "name": "eval"\n }\n ]\n }\n}', 'input_base': '/home/kbuilder/imported/my_pipeline/data', 'span': 0, 'version': None, 'input_fingerprint': 'split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/1/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/stateful_working_dir/2022-02-03T11:09:12.120566', tmp_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/1/.temp/', pipeline_node=node_info { type { name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen" } id: "CsvExampleGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:12.120566" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } } outputs { outputs { key: "examples" value { artifact_spec { type { name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET } } } } } parameters { parameters { key: "input_base" value { field_value { string_value: "/home/kbuilder/imported/my_pipeline/data" } } } parameters { key: "input_config" value { field_value { string_value: "{\n \"splits\": [\n {\n \"name\": \"single_split\",\n \"pattern\": \"*\"\n }\n ]\n}" } } } parameters { key: "output_config" value { field_value { string_value: "{\n \"split_config\": {\n \"splits\": [\n {\n \"hash_buckets\": 2,\n \"name\": \"train\"\n },\n {\n \"hash_buckets\": 1,\n \"name\": \"eval\"\n }\n ]\n }\n}" } } } parameters { key: "output_data_format" value { field_value { int_value: 6 } } } parameters { key: "output_file_format" value { field_value { int_value: 5 } } } } downstream_nodes: "StatisticsGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2022-02-03T11:09:12.120566') INFO:absl:Generating examples. INFO:absl:Processing input csv data /home/kbuilder/imported/my_pipeline/data/* to TFExample. WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter. E0203 11:09:12.848598153 5127 fork_posix.cc:70] Fork support is only compatible with the epoll1 and poll polling strategies WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be. INFO:absl:Examples generated. INFO:absl:Value type <class 'NoneType'> of key version in exec_properties is not supported, going to drop it INFO:absl:Value type <class 'list'> of key _beam_pipeline_args in exec_properties is not supported, going to drop it INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 1 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/1" custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:09:12.120566:CsvExampleGen:examples:0" } } custom_properties { key: "span" value { int_value: 0 } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } , artifact_type: name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}) for execution 1 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component CsvExampleGen is finished. INFO:absl:Component StatisticsGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.statistics_gen.component.StatisticsGen" base_type: PROCESS } id: "StatisticsGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:12.120566" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } } inputs { inputs { key: "examples" value { channels { producer_node_query { id: "CsvExampleGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:12.120566" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } artifact_query { type { name: "Examples" base_type: DATASET } } output_key: "examples" } min_count: 1 } } } outputs { outputs { key: "statistics" value { artifact_spec { type { name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } } upstream_nodes: "CsvExampleGen" downstream_nodes: "SchemaGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 2 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=2, input_dict={'examples': [Artifact(artifact: id: 1 type_id: 15 uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/1" properties { key: "split_names" value { string_value: "[\"train\", \"eval\"]" } } custom_properties { key: "file_format" value { string_value: "tfrecords_gzip" } } custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:09:12.120566:CsvExampleGen:examples:0" } } custom_properties { key: "payload_format" value { string_value: "FORMAT_TF_EXAMPLE" } } custom_properties { key: "span" value { int_value: 0 } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } state: LIVE create_time_since_epoch: 1643886553302 last_update_time_since_epoch: 1643886553302 , artifact_type: id: 15 name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}, output_dict=defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/2" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:09:12.120566:StatisticsGen:statistics:0" } } , artifact_type: name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}), exec_properties={'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/2/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/stateful_working_dir/2022-02-03T11:09:12.120566', tmp_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/2/.temp/', pipeline_node=node_info { type { name: "tfx.components.statistics_gen.component.StatisticsGen" base_type: PROCESS } id: "StatisticsGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:12.120566" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } } inputs { inputs { key: "examples" value { channels { producer_node_query { id: "CsvExampleGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:12.120566" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } artifact_query { type { name: "Examples" base_type: DATASET } } output_key: "examples" } min_count: 1 } } } outputs { outputs { key: "statistics" value { artifact_spec { type { name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } } upstream_nodes: "CsvExampleGen" downstream_nodes: "SchemaGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2022-02-03T11:09:12.120566') INFO:absl:Generating statistics for split train. INFO:absl:Statistics for split train written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/2/Split-train. INFO:absl:Generating statistics for split eval. INFO:absl:Statistics for split eval written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/2/Split-eval. WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter. INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 2 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/2" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:09:12.120566:StatisticsGen:statistics:0" } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } , artifact_type: name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}) for execution 2 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component StatisticsGen is finished. INFO:absl:Component SchemaGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.schema_gen.component.SchemaGen" base_type: PROCESS } id: "SchemaGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:12.120566" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.SchemaGen" } } } } inputs { inputs { key: "statistics" value { channels { producer_node_query { id: "StatisticsGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:12.120566" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } artifact_query { type { name: "ExampleStatistics" base_type: STATISTICS } } output_key: "statistics" } min_count: 1 } } } outputs { outputs { key: "schema" value { artifact_spec { type { name: "Schema" } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } parameters { key: "infer_feature_shape" value { field_value { int_value: 1 } } } } upstream_nodes: "StatisticsGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 3 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=3, input_dict={'statistics': [Artifact(artifact: id: 2 type_id: 17 uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/2" properties { key: "split_names" value { string_value: "[\"train\", \"eval\"]" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:09:12.120566:StatisticsGen:statistics:0" } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } state: LIVE create_time_since_epoch: 1643886556588 last_update_time_since_epoch: 1643886556588 , artifact_type: id: 17 name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}, output_dict=defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/3" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:09:12.120566:SchemaGen:schema:0" } } , artifact_type: name: "Schema" )]}), exec_properties={'infer_feature_shape': 1, 'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/3/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/stateful_working_dir/2022-02-03T11:09:12.120566', tmp_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/3/.temp/', pipeline_node=node_info { type { name: "tfx.components.schema_gen.component.SchemaGen" base_type: PROCESS } id: "SchemaGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:12.120566" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.SchemaGen" } } } } inputs { inputs { key: "statistics" value { channels { producer_node_query { id: "StatisticsGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:12.120566" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } artifact_query { type { name: "ExampleStatistics" base_type: STATISTICS } } output_key: "statistics" } min_count: 1 } } } outputs { outputs { key: "schema" value { artifact_spec { type { name: "Schema" } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } parameters { key: "infer_feature_shape" value { field_value { int_value: 1 } } } } upstream_nodes: "StatisticsGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2022-02-03T11:09:12.120566') INFO:absl:Processing schema from statistics for split train. INFO:absl:Processing schema from statistics for split eval. INFO:absl:Schema written to ./tfx_pipeline_output/my_pipeline/SchemaGen/schema/3/schema.pbtxt. INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 3 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/3" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:09:12.120566:SchemaGen:schema:0" } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } , artifact_type: name: "Schema" )]}) for execution 3 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component SchemaGen is finished.
Você deverá ver "Component ExampleValidator concluído". se o pipeline foi executado com sucesso.
Examine a saída do pipeline.
O pipeline TFX produz dois tipos de saída, artefatos e um banco de dados de metadados (MLMD) que contém metadados de artefatos e execuções de pipeline. A localização para a saída é definida em local_runner.py
. Por padrão, os artefatos são armazenados no diretório tfx_pipeline_output
e os metadados são armazenados como um banco de dados sqlite no diretório tfx_metadata
.
Você pode usar APIs MLMD para examinar essas saídas. Primeiro, definiremos algumas funções utilitárias para pesquisar artefatos de saída que acabaram de ser produzidos.
import tensorflow as tf
import tfx
from ml_metadata import errors
from ml_metadata.proto import metadata_store_pb2
from tfx.types import artifact_utils
# TODO(b/171447278): Move these functions into TFX library.
def get_latest_executions(store, pipeline_name, component_id = None):
"""Fetch all pipeline runs."""
if component_id is None: # Find entire pipeline runs.
run_contexts = [
c for c in store.get_contexts_by_type('run')
if c.properties['pipeline_name'].string_value == pipeline_name
]
else: # Find specific component runs.
run_contexts = [
c for c in store.get_contexts_by_type('component_run')
if c.properties['pipeline_name'].string_value == pipeline_name and
c.properties['component_id'].string_value == component_id
]
if not run_contexts:
return []
# Pick the latest run context.
latest_context = max(run_contexts,
key=lambda c: c.last_update_time_since_epoch)
return store.get_executions_by_context(latest_context.id)
def get_latest_artifacts(store, pipeline_name, component_id = None):
"""Fetch all artifacts from latest pipeline execution."""
executions = get_latest_executions(store, pipeline_name, component_id)
# Fetch all artifacts produced from the given executions.
execution_ids = [e.id for e in executions]
events = store.get_events_by_execution_ids(execution_ids)
artifact_ids = [
event.artifact_id for event in events
if event.type == metadata_store_pb2.Event.OUTPUT
]
return store.get_artifacts_by_id(artifact_ids)
def find_latest_artifacts_by_type(store, artifacts, artifact_type):
"""Get the latest artifacts of a specified type."""
# Get type information from MLMD
try:
artifact_type = store.get_artifact_type(artifact_type)
except errors.NotFoundError:
return []
# Filter artifacts with type.
filtered_artifacts = [aritfact for aritfact in artifacts
if aritfact.type_id == artifact_type.id]
# Convert MLMD artifact data into TFX Artifact instances.
return [artifact_utils.deserialize_artifact(artifact_type, artifact)
for artifact in filtered_artifacts]
from tfx.orchestration.experimental.interactive import visualizations
def visualize_artifacts(artifacts):
"""Visualizes artifacts using standard visualization modules."""
for artifact in artifacts:
visualization = visualizations.get_registry().get_visualization(
artifact.type_name)
if visualization:
visualization.display(artifact)
from tfx.orchestration.experimental.interactive import standard_visualizations
standard_visualizations.register_standard_visualizations()
import pprint
from tfx.orchestration import metadata
from tfx.types import artifact_utils
from tfx.types import standard_artifacts
def preview_examples(artifacts):
"""Preview a few records from Examples artifacts."""
pp = pprint.PrettyPrinter()
for artifact in artifacts:
print("==== Examples artifact:{}({})".format(artifact.name, artifact.uri))
for split in artifact_utils.decode_split_names(artifact.split_names):
print("==== Reading from split:{}".format(split))
split_uri = artifact_utils.get_split_uri([artifact], split)
# Get the list of files in this directory (all compressed TFRecord files)
tfrecord_filenames = [os.path.join(split_uri, name)
for name in os.listdir(split_uri)]
# Create a `TFRecordDataset` to read these files
dataset = tf.data.TFRecordDataset(tfrecord_filenames,
compression_type="GZIP")
# Iterate over the first 2 records and decode them.
for tfrecord in dataset.take(2):
serialized_example = tfrecord.numpy()
example = tf.train.Example()
example.ParseFromString(serialized_example)
pp.pprint(example)
import local_runner
metadata_connection_config = metadata.sqlite_metadata_connection_config(
local_runner.METADATA_PATH)
Agora podemos ler metadados de artefatos de saída do MLMD.
with metadata.Metadata(metadata_connection_config) as metadata_handler:
# Search all aritfacts from the previous pipeline run.
artifacts = get_latest_artifacts(metadata_handler.store, PIPELINE_NAME)
# Find artifacts of Examples type.
examples_artifacts = find_latest_artifacts_by_type(
metadata_handler.store, artifacts,
standard_artifacts.Examples.TYPE_NAME)
# Find artifacts generated from StatisticsGen.
stats_artifacts = find_latest_artifacts_by_type(
metadata_handler.store, artifacts,
standard_artifacts.ExampleStatistics.TYPE_NAME)
# Find artifacts generated from SchemaGen.
schema_artifacts = find_latest_artifacts_by_type(
metadata_handler.store, artifacts,
standard_artifacts.Schema.TYPE_NAME)
# Find artifacts generated from ExampleValidator.
anomalies_artifacts = find_latest_artifacts_by_type(
metadata_handler.store, artifacts,
standard_artifacts.ExampleAnomalies.TYPE_NAME)
Agora podemos examinar as saídas de cada componente. A validação de dados do Tensorflow (TFDV) é usada em StatisticsGen
, SchemaGen
e ExampleValidator
, e o TFDV pode ser usado para visualizar as saídas desses componentes.
Neste tutorial, usaremos métodos auxiliares de visualização no TFX que usam TFDV internamente para mostrar a visualização. Consulte o tutorial de componentes do TFX para saber mais sobre cada componente.
Examinar o formulário de saída ExampleGen
Vamos examinar a saída de ExampleGen. Dê uma olhada nos dois primeiros exemplos para cada divisão:
preview_examples(examples_artifacts)
Por padrão, o TFX ExampleGen divide os exemplos em duas divisões, train e eval , mas você pode ajustar sua configuração de divisão .
Examinar a saída do StatisticsGen
visualize_artifacts(stats_artifacts)
Essas estatísticas são fornecidas ao SchemaGen para construir um esquema de dados automaticamente.
Examinar a saída do SchemaGen
visualize_artifacts(schema_artifacts)
Esse esquema é inferido automaticamente da saída de StatisticsGen. Usaremos esse esquema gerado neste tutorial, mas você também pode modificar e personalizar o esquema .
Examinar a saída do ExampleValidator
visualize_artifacts(anomalies_artifacts)
Se alguma anomalia for encontrada, você pode revisar seus dados para que todos os exemplos sigam suas suposições. Saídas de outros componentes como StatistcsGen podem ser úteis. As anomalias encontradas não bloqueiam a execução do pipeline.
Você pode ver os recursos disponíveis nas saídas do SchemaGen
. Se seus recursos puderem ser usados para construir o modelo de ML diretamente no Trainer
, pule a próxima etapa e vá para a Etapa 4. Caso contrário, você poderá fazer algum trabalho de engenharia de recursos na próxima etapa. O componente Transform
é necessário quando operações de passagem completa, como calcular médias, são necessárias, especialmente quando você precisa dimensionar.
Etapa 3. (Opcional) Engenharia de recursos com o componente Transform.
Nesta etapa, você definirá vários trabalhos de engenharia de recursos que serão usados pelo componente Transform
no pipeline. Consulte o guia do componente Transform para obter mais informações.
Isso só é necessário se o código de treinamento exigir recursos adicionais que não estejam disponíveis na saída de ExampleGen. Caso contrário, sinta-se à vontade para avançar rapidamente para a próxima etapa do uso do Trainer.
Definir características do modelo
models/features.py
contém constantes para definir recursos para o modelo, incluindo nomes de recursos, tamanho do vocabulário e assim por diante. Por padrão, o modelo de penguin
tem dois custos, FEATURE_KEYS
e LABEL_KEY
, porque nosso modelo de penguin
resolve um problema de classificação usando aprendizado supervisionado e todos os recursos são recursos numéricos contínuos. Veja as definições de recursos do exemplo de táxi de chicago para outro exemplo.
Implemente o pré-processamento para treinamento/atendimento em preprocessing_fn().
A engenharia de recursos real acontece na função preprocessing_fn()
em models/preprocessing.py
.
Em preprocessing_fn
você pode definir uma série de funções que manipulam o dict de entrada dos tensores para produzir o dict de saída dos tensores. Existem funções auxiliares como scale_to_0_1
e compute_and_apply_vocabulary
na API de transformação do TensorFlow ou você pode simplesmente usar funções normais do TensorFlow. Por padrão, o modelo de penguin
inclui exemplos de uso da função tft.scale_to_z_score para normalizar valores de recursos.
Consulte o guia Tensflow Transform para obter mais informações sobre a criação de preprocessing_fn
.
Adicione o componente Transform ao pipeline.
Se seu preprocessing_fn estiver pronto, adicione o componente Transform
ao pipeline.
- No arquivo
pipeline/pipeline.py
, remova o comentário# components.append(transform)
para adicionar o componente ao pipeline.
Você pode atualizar o pipeline e executá-lo novamente.
!tfx pipeline update --engine=local --pipeline_path=local_runner.py \
&& tfx run create --engine=local --pipeline_name={PIPELINE_NAME}
CLI Updating pipeline INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Excluding no splits because exclude_splits is not set. Pipeline "my_pipeline" updated successfully. CLI Creating a run for pipeline: my_pipeline INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Using deployment config: executor_specs { key: "CsvExampleGen" value { beam_executable_spec { python_executor_spec { class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor" } } } } executor_specs { key: "SchemaGen" value { python_class_executable_spec { class_path: "tfx.components.schema_gen.executor.Executor" } } } executor_specs { key: "StatisticsGen" value { beam_executable_spec { python_executor_spec { class_path: "tfx.components.statistics_gen.executor.Executor" } } } } custom_driver_specs { key: "CsvExampleGen" value { python_class_executable_spec { class_path: "tfx.components.example_gen.driver.FileBasedDriver" } } } metadata_connection_config { database_connection_config { sqlite { filename_uri: "./tfx_metadata/my_pipeline/metadata.db" connection_mode: READWRITE_OPENCREATE } } } INFO:absl:Using connection config: sqlite { filename_uri: "./tfx_metadata/my_pipeline/metadata.db" connection_mode: READWRITE_OPENCREATE } INFO:absl:Component CsvExampleGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen" } id: "CsvExampleGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:37.055994" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } } outputs { outputs { key: "examples" value { artifact_spec { type { name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET } } } } } parameters { parameters { key: "input_base" value { field_value { string_value: "/home/kbuilder/imported/my_pipeline/data" } } } parameters { key: "input_config" value { field_value { string_value: "{\n \"splits\": [\n {\n \"name\": \"single_split\",\n \"pattern\": \"*\"\n }\n ]\n}" } } } parameters { key: "output_config" value { field_value { string_value: "{\n \"split_config\": {\n \"splits\": [\n {\n \"hash_buckets\": 2,\n \"name\": \"train\"\n },\n {\n \"hash_buckets\": 1,\n \"name\": \"eval\"\n }\n ]\n }\n}" } } } parameters { key: "output_data_format" value { field_value { int_value: 6 } } } parameters { key: "output_file_format" value { field_value { int_value: 5 } } } } downstream_nodes: "StatisticsGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized INFO:absl:select span and version = (0, None) INFO:absl:latest span and version = (0, None) INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 4 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=4, input_dict={}, output_dict=defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/4" custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:09:37.055994:CsvExampleGen:examples:0" } } custom_properties { key: "span" value { int_value: 0 } } , artifact_type: name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}), exec_properties={'output_file_format': 5, 'input_base': '/home/kbuilder/imported/my_pipeline/data', 'output_config': '{\n "split_config": {\n "splits": [\n {\n "hash_buckets": 2,\n "name": "train"\n },\n {\n "hash_buckets": 1,\n "name": "eval"\n }\n ]\n }\n}', 'output_data_format': 6, 'input_config': '{\n "splits": [\n {\n "name": "single_split",\n "pattern": "*"\n }\n ]\n}', 'span': 0, 'version': None, 'input_fingerprint': 'split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/4/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/stateful_working_dir/2022-02-03T11:09:37.055994', tmp_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/4/.temp/', pipeline_node=node_info { type { name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen" } id: "CsvExampleGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:37.055994" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } } outputs { outputs { key: "examples" value { artifact_spec { type { name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET } } } } } parameters { parameters { key: "input_base" value { field_value { string_value: "/home/kbuilder/imported/my_pipeline/data" } } } parameters { key: "input_config" value { field_value { string_value: "{\n \"splits\": [\n {\n \"name\": \"single_split\",\n \"pattern\": \"*\"\n }\n ]\n}" } } } parameters { key: "output_config" value { field_value { string_value: "{\n \"split_config\": {\n \"splits\": [\n {\n \"hash_buckets\": 2,\n \"name\": \"train\"\n },\n {\n \"hash_buckets\": 1,\n \"name\": \"eval\"\n }\n ]\n }\n}" } } } parameters { key: "output_data_format" value { field_value { int_value: 6 } } } parameters { key: "output_file_format" value { field_value { int_value: 5 } } } } downstream_nodes: "StatisticsGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2022-02-03T11:09:37.055994') INFO:absl:Generating examples. INFO:absl:Processing input csv data /home/kbuilder/imported/my_pipeline/data/* to TFExample. WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter. E0203 11:09:37.596944686 5287 fork_posix.cc:70] Fork support is only compatible with the epoll1 and poll polling strategies WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be. INFO:absl:Examples generated. INFO:absl:Value type <class 'NoneType'> of key version in exec_properties is not supported, going to drop it INFO:absl:Value type <class 'list'> of key _beam_pipeline_args in exec_properties is not supported, going to drop it INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 4 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/4" custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:09:37.055994:CsvExampleGen:examples:0" } } custom_properties { key: "span" value { int_value: 0 } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } , artifact_type: name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}) for execution 4 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component CsvExampleGen is finished. INFO:absl:Component StatisticsGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.statistics_gen.component.StatisticsGen" base_type: PROCESS } id: "StatisticsGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:37.055994" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } } inputs { inputs { key: "examples" value { channels { producer_node_query { id: "CsvExampleGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:37.055994" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } artifact_query { type { name: "Examples" base_type: DATASET } } output_key: "examples" } min_count: 1 } } } outputs { outputs { key: "statistics" value { artifact_spec { type { name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } } upstream_nodes: "CsvExampleGen" downstream_nodes: "SchemaGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 5 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=5, input_dict={'examples': [Artifact(artifact: id: 4 type_id: 15 uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/4" properties { key: "split_names" value { string_value: "[\"train\", \"eval\"]" } } custom_properties { key: "file_format" value { string_value: "tfrecords_gzip" } } custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:09:37.055994:CsvExampleGen:examples:0" } } custom_properties { key: "payload_format" value { string_value: "FORMAT_TF_EXAMPLE" } } custom_properties { key: "span" value { int_value: 0 } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } state: LIVE create_time_since_epoch: 1643886578210 last_update_time_since_epoch: 1643886578210 , artifact_type: id: 15 name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}, output_dict=defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/5" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:09:37.055994:StatisticsGen:statistics:0" } } , artifact_type: name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}), exec_properties={'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/5/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/stateful_working_dir/2022-02-03T11:09:37.055994', tmp_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/5/.temp/', pipeline_node=node_info { type { name: "tfx.components.statistics_gen.component.StatisticsGen" base_type: PROCESS } id: "StatisticsGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:37.055994" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } } inputs { inputs { key: "examples" value { channels { producer_node_query { id: "CsvExampleGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:37.055994" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } artifact_query { type { name: "Examples" base_type: DATASET } } output_key: "examples" } min_count: 1 } } } outputs { outputs { key: "statistics" value { artifact_spec { type { name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } } upstream_nodes: "CsvExampleGen" downstream_nodes: "SchemaGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2022-02-03T11:09:37.055994') INFO:absl:Generating statistics for split train. INFO:absl:Statistics for split train written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/5/Split-train. INFO:absl:Generating statistics for split eval. INFO:absl:Statistics for split eval written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/5/Split-eval. WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter. INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 5 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/5" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:09:37.055994:StatisticsGen:statistics:0" } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } , artifact_type: name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}) for execution 5 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component StatisticsGen is finished. INFO:absl:Component SchemaGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.schema_gen.component.SchemaGen" base_type: PROCESS } id: "SchemaGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:37.055994" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.SchemaGen" } } } } inputs { inputs { key: "statistics" value { channels { producer_node_query { id: "StatisticsGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:37.055994" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } artifact_query { type { name: "ExampleStatistics" base_type: STATISTICS } } output_key: "statistics" } min_count: 1 } } } outputs { outputs { key: "schema" value { artifact_spec { type { name: "Schema" } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } parameters { key: "infer_feature_shape" value { field_value { int_value: 1 } } } } upstream_nodes: "StatisticsGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 6 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=6, input_dict={'statistics': [Artifact(artifact: id: 5 type_id: 17 uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/5" properties { key: "split_names" value { string_value: "[\"train\", \"eval\"]" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:09:37.055994:StatisticsGen:statistics:0" } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } state: LIVE create_time_since_epoch: 1643886581527 last_update_time_since_epoch: 1643886581527 , artifact_type: id: 17 name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}, output_dict=defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/6" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:09:37.055994:SchemaGen:schema:0" } } , artifact_type: name: "Schema" )]}), exec_properties={'exclude_splits': '[]', 'infer_feature_shape': 1}, execution_output_uri='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/6/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/stateful_working_dir/2022-02-03T11:09:37.055994', tmp_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/6/.temp/', pipeline_node=node_info { type { name: "tfx.components.schema_gen.component.SchemaGen" base_type: PROCESS } id: "SchemaGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:37.055994" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.SchemaGen" } } } } inputs { inputs { key: "statistics" value { channels { producer_node_query { id: "StatisticsGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:37.055994" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } artifact_query { type { name: "ExampleStatistics" base_type: STATISTICS } } output_key: "statistics" } min_count: 1 } } } outputs { outputs { key: "schema" value { artifact_spec { type { name: "Schema" } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } parameters { key: "infer_feature_shape" value { field_value { int_value: 1 } } } } upstream_nodes: "StatisticsGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2022-02-03T11:09:37.055994') INFO:absl:Processing schema from statistics for split train. INFO:absl:Processing schema from statistics for split eval. INFO:absl:Schema written to ./tfx_pipeline_output/my_pipeline/SchemaGen/schema/6/schema.pbtxt. INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 6 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/6" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:09:37.055994:SchemaGen:schema:0" } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } , artifact_type: name: "Schema" )]}) for execution 6 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component SchemaGen is finished.
Se o pipeline foi executado com êxito, você verá "A transformação do componente foi concluída". em algum lugar do log. Como o componente Transform
e o componente ExampleValidator
não são dependentes um do outro, a ordem das execuções não é fixa. Dito isso, Transform
e ExampleValidator
podem ser o último componente na execução do pipeline.
Examinar a saída do Transform
O componente Transform cria dois tipos de saídas, um gráfico do Tensorflow e exemplos transformados. Os exemplos transformados são do tipo de artefato Examples que também é produzido por ExampleGen, mas este contém valores de recursos transformados.
Você pode examiná-los como fizemos na etapa anterior.
with metadata.Metadata(metadata_connection_config) as metadata_handler:
# Search all aritfacts from the previous run of Transform component.
artifacts = get_latest_artifacts(metadata_handler.store,
PIPELINE_NAME, "Transform")
# Find artifacts of Examples type.
transformed_examples_artifacts = find_latest_artifacts_by_type(
metadata_handler.store, artifacts,
standard_artifacts.Examples.TYPE_NAME)
preview_examples(transformed_examples_artifacts)
Etapa 4. Treine seu modelo com o componente Trainer.
Construiremos um modelo de ML usando o componente Trainer
. Consulte o guia do componente Trainer para obter mais informações. Você precisa fornecer seu código de modelo para o componente Trainer.
Defina seu modelo.
No modelo de pinguim, models.model.run_fn
é usado como argumento run_fn
para o componente Trainer
. Isso significa que a função run_fn()
em models/model.py
será chamada quando o componente Trainer
for executado. Você pode ver o código para construir um modelo DNN simples usando a API keras
no código fornecido. Consulte o guia TensorFlow 2.x no TFX para obter mais informações sobre como usar a API keras no TFX.
Neste run_fn
, você deve construir um modelo e salvá-lo em um diretório apontado por fn_args.serving_model_dir
que é especificado pelo componente. Você pode usar outros argumentos em fn_args
que são passados para run_fn
. Veja os códigos relacionados para a lista completa de argumentos em fn_args
.
Defina seus recursos em models/features.py
e use-os conforme necessário. Se você transformou seus recursos na Etapa 3, deve usar recursos transformados como entradas para seu modelo.
Adicione o componente Trainer ao pipeline.
Se seu run_fn estiver pronto, adicione o componente Trainer
ao pipeline.
- No arquivo
pipeline/pipeline.py
, remova o comentário# components.append(trainer)
para adicionar o componente ao pipeline.
Os argumentos para o componente trainer podem depender se você usa o componente Transform ou não.
- Se você NÃO usar o componente
Transform
, não precisará alterar os argumentos. Se você usar o componente
Transform
, precisará alterar os argumentos ao criar uma instância do componenteTrainer
.- Altere o argumento de
examples
paraexamples=transform.outputs['transformed_examples'],
. Precisamos usar exemplos transformados para treinamento. - Adicione o argumento
transform_graph
comotransform_graph=transform.outputs['transform_graph'],
. Este gráfico contém o gráfico do TensorFlow para as operações de transformação. - Após as alterações acima, o código para a criação do componente Trainer terá a seguinte aparência.
# If you use a Transform component. trainer = Trainer( run_fn=run_fn, examples=transform.outputs['transformed_examples'], transform_graph=transform.outputs['transform_graph'], schema=schema_gen.outputs['schema'], ...
- Altere o argumento de
Você pode atualizar o pipeline e executá-lo novamente.
!tfx pipeline update --engine=local --pipeline_path=local_runner.py \
&& tfx run create --engine=local --pipeline_name={PIPELINE_NAME}
CLI Updating pipeline INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Excluding no splits because exclude_splits is not set. Pipeline "my_pipeline" updated successfully. CLI Creating a run for pipeline: my_pipeline INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Using deployment config: executor_specs { key: "CsvExampleGen" value { beam_executable_spec { python_executor_spec { class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor" } } } } executor_specs { key: "SchemaGen" value { python_class_executable_spec { class_path: "tfx.components.schema_gen.executor.Executor" } } } executor_specs { key: "StatisticsGen" value { beam_executable_spec { python_executor_spec { class_path: "tfx.components.statistics_gen.executor.Executor" } } } } custom_driver_specs { key: "CsvExampleGen" value { python_class_executable_spec { class_path: "tfx.components.example_gen.driver.FileBasedDriver" } } } metadata_connection_config { database_connection_config { sqlite { filename_uri: "./tfx_metadata/my_pipeline/metadata.db" connection_mode: READWRITE_OPENCREATE } } } INFO:absl:Using connection config: sqlite { filename_uri: "./tfx_metadata/my_pipeline/metadata.db" connection_mode: READWRITE_OPENCREATE } INFO:absl:Component CsvExampleGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen" } id: "CsvExampleGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:00.469382" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } } outputs { outputs { key: "examples" value { artifact_spec { type { name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET } } } } } parameters { parameters { key: "input_base" value { field_value { string_value: "/home/kbuilder/imported/my_pipeline/data" } } } parameters { key: "input_config" value { field_value { string_value: "{\n \"splits\": [\n {\n \"name\": \"single_split\",\n \"pattern\": \"*\"\n }\n ]\n}" } } } parameters { key: "output_config" value { field_value { string_value: "{\n \"split_config\": {\n \"splits\": [\n {\n \"hash_buckets\": 2,\n \"name\": \"train\"\n },\n {\n \"hash_buckets\": 1,\n \"name\": \"eval\"\n }\n ]\n }\n}" } } } parameters { key: "output_data_format" value { field_value { int_value: 6 } } } parameters { key: "output_file_format" value { field_value { int_value: 5 } } } } downstream_nodes: "StatisticsGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized INFO:absl:select span and version = (0, None) INFO:absl:latest span and version = (0, None) INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 7 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=7, input_dict={}, output_dict=defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/7" custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:00.469382:CsvExampleGen:examples:0" } } custom_properties { key: "span" value { int_value: 0 } } , artifact_type: name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}), exec_properties={'input_base': '/home/kbuilder/imported/my_pipeline/data', 'output_config': '{\n "split_config": {\n "splits": [\n {\n "hash_buckets": 2,\n "name": "train"\n },\n {\n "hash_buckets": 1,\n "name": "eval"\n }\n ]\n }\n}', 'output_data_format': 6, 'output_file_format': 5, 'input_config': '{\n "splits": [\n {\n "name": "single_split",\n "pattern": "*"\n }\n ]\n}', 'span': 0, 'version': None, 'input_fingerprint': 'split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/7/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/stateful_working_dir/2022-02-03T11:10:00.469382', tmp_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/7/.temp/', pipeline_node=node_info { type { name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen" } id: "CsvExampleGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:00.469382" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } } outputs { outputs { key: "examples" value { artifact_spec { type { name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET } } } } } parameters { parameters { key: "input_base" value { field_value { string_value: "/home/kbuilder/imported/my_pipeline/data" } } } parameters { key: "input_config" value { field_value { string_value: "{\n \"splits\": [\n {\n \"name\": \"single_split\",\n \"pattern\": \"*\"\n }\n ]\n}" } } } parameters { key: "output_config" value { field_value { string_value: "{\n \"split_config\": {\n \"splits\": [\n {\n \"hash_buckets\": 2,\n \"name\": \"train\"\n },\n {\n \"hash_buckets\": 1,\n \"name\": \"eval\"\n }\n ]\n }\n}" } } } parameters { key: "output_data_format" value { field_value { int_value: 6 } } } parameters { key: "output_file_format" value { field_value { int_value: 5 } } } } downstream_nodes: "StatisticsGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2022-02-03T11:10:00.469382') INFO:absl:Generating examples. INFO:absl:Processing input csv data /home/kbuilder/imported/my_pipeline/data/* to TFExample. WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter. E0203 11:10:01.173700221 5436 fork_posix.cc:70] Fork support is only compatible with the epoll1 and poll polling strategies WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be. INFO:absl:Examples generated. INFO:absl:Value type <class 'NoneType'> of key version in exec_properties is not supported, going to drop it INFO:absl:Value type <class 'list'> of key _beam_pipeline_args in exec_properties is not supported, going to drop it INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 7 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/7" custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:00.469382:CsvExampleGen:examples:0" } } custom_properties { key: "span" value { int_value: 0 } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } , artifact_type: name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}) for execution 7 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component CsvExampleGen is finished. INFO:absl:Component StatisticsGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.statistics_gen.component.StatisticsGen" base_type: PROCESS } id: "StatisticsGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:00.469382" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } } inputs { inputs { key: "examples" value { channels { producer_node_query { id: "CsvExampleGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:00.469382" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } artifact_query { type { name: "Examples" base_type: DATASET } } output_key: "examples" } min_count: 1 } } } outputs { outputs { key: "statistics" value { artifact_spec { type { name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } } upstream_nodes: "CsvExampleGen" downstream_nodes: "SchemaGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 8 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=8, input_dict={'examples': [Artifact(artifact: id: 7 type_id: 15 uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/7" properties { key: "split_names" value { string_value: "[\"train\", \"eval\"]" } } custom_properties { key: "file_format" value { string_value: "tfrecords_gzip" } } custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:00.469382:CsvExampleGen:examples:0" } } custom_properties { key: "payload_format" value { string_value: "FORMAT_TF_EXAMPLE" } } custom_properties { key: "span" value { int_value: 0 } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } state: LIVE create_time_since_epoch: 1643886601629 last_update_time_since_epoch: 1643886601629 , artifact_type: id: 15 name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}, output_dict=defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/8" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:00.469382:StatisticsGen:statistics:0" } } , artifact_type: name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}), exec_properties={'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/8/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/stateful_working_dir/2022-02-03T11:10:00.469382', tmp_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/8/.temp/', pipeline_node=node_info { type { name: "tfx.components.statistics_gen.component.StatisticsGen" base_type: PROCESS } id: "StatisticsGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:00.469382" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } } inputs { inputs { key: "examples" value { channels { producer_node_query { id: "CsvExampleGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:00.469382" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } artifact_query { type { name: "Examples" base_type: DATASET } } output_key: "examples" } min_count: 1 } } } outputs { outputs { key: "statistics" value { artifact_spec { type { name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } } upstream_nodes: "CsvExampleGen" downstream_nodes: "SchemaGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2022-02-03T11:10:00.469382') INFO:absl:Generating statistics for split train. INFO:absl:Statistics for split train written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/8/Split-train. INFO:absl:Generating statistics for split eval. INFO:absl:Statistics for split eval written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/8/Split-eval. WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter. INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 8 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/8" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:00.469382:StatisticsGen:statistics:0" } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } , artifact_type: name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}) for execution 8 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component StatisticsGen is finished. INFO:absl:Component SchemaGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.schema_gen.component.SchemaGen" base_type: PROCESS } id: "SchemaGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:00.469382" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.SchemaGen" } } } } inputs { inputs { key: "statistics" value { channels { producer_node_query { id: "StatisticsGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:00.469382" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } artifact_query { type { name: "ExampleStatistics" base_type: STATISTICS } } output_key: "statistics" } min_count: 1 } } } outputs { outputs { key: "schema" value { artifact_spec { type { name: "Schema" } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } parameters { key: "infer_feature_shape" value { field_value { int_value: 1 } } } } upstream_nodes: "StatisticsGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 9 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=9, input_dict={'statistics': [Artifact(artifact: id: 8 type_id: 17 uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/8" properties { key: "split_names" value { string_value: "[\"train\", \"eval\"]" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:00.469382:StatisticsGen:statistics:0" } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } state: LIVE create_time_since_epoch: 1643886605023 last_update_time_since_epoch: 1643886605023 , artifact_type: id: 17 name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}, output_dict=defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/9" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:00.469382:SchemaGen:schema:0" } } , artifact_type: name: "Schema" )]}), exec_properties={'exclude_splits': '[]', 'infer_feature_shape': 1}, execution_output_uri='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/9/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/stateful_working_dir/2022-02-03T11:10:00.469382', tmp_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/9/.temp/', pipeline_node=node_info { type { name: "tfx.components.schema_gen.component.SchemaGen" base_type: PROCESS } id: "SchemaGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:00.469382" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.SchemaGen" } } } } inputs { inputs { key: "statistics" value { channels { producer_node_query { id: "StatisticsGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:00.469382" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } artifact_query { type { name: "ExampleStatistics" base_type: STATISTICS } } output_key: "statistics" } min_count: 1 } } } outputs { outputs { key: "schema" value { artifact_spec { type { name: "Schema" } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } parameters { key: "infer_feature_shape" value { field_value { int_value: 1 } } } } upstream_nodes: "StatisticsGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2022-02-03T11:10:00.469382') INFO:absl:Processing schema from statistics for split train. INFO:absl:Processing schema from statistics for split eval. INFO:absl:Schema written to ./tfx_pipeline_output/my_pipeline/SchemaGen/schema/9/schema.pbtxt. INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 9 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/9" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:00.469382:SchemaGen:schema:0" } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } , artifact_type: name: "Schema" )]}) for execution 9 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component SchemaGen is finished.
Quando essa execução for executada com êxito, você terá criado e executado seu primeiro pipeline TFX para seu modelo. Parabéns!
Seu novo modelo estará localizado em algum lugar no diretório de saída, mas seria melhor ter um modelo em local fixo ou serviço fora do pipeline do TFX que contenha muitos resultados provisórios. Melhor ainda com a avaliação contínua do modelo construído que é fundamental nos sistemas de produção de ML. Veremos como a avaliação contínua e as implantações funcionam no TFX na próxima etapa.
Etapa 5. (Opcional) Avalie o modelo com o Evaluator e publique com o pusher.
O componente Evaluator
avalia continuamente cada modelo criado do Trainer
e o Pusher
copia o modelo para um local predefinido no sistema de arquivos ou até mesmo para os modelos do Google Cloud AI Platform .
Adiciona o componente Avaliador ao pipeline.
No arquivo pipeline/pipeline.py
:
- Remova o comentário de
# components.append(model_resolver)
para adicionar o resolvedor de modelo mais recente ao pipeline. O avaliador pode ser usado para comparar um modelo com o modelo de linha de base antigo que passou pelo avaliador na última execução do pipeline.LatestBlessedModelResolver
encontra o modelo mais recente que passou no Evaluator. - Defina o
tfma.MetricsSpec
adequado para o seu modelo. A avaliação pode ser diferente para cada modelo de ML. No modelo de pinguim,SparseCategoricalAccuracy
foi usado porque estamos resolvendo um problema de classificação de várias categorias. Você também precisa especificartfma.SliceSpec
para analisar seu modelo para fatias específicas. Para obter mais detalhes, consulte o guia do componente do avaliador . - Remova o comentário
# components.append(evaluator)
para adicionar o componente ao pipeline.
Você pode atualizar o pipeline e executá-lo novamente.
# Update and run the pipeline.
!tfx pipeline update --engine=local --pipeline_path=local_runner.py \
&& tfx run create --engine=local --pipeline_name={PIPELINE_NAME}
CLI Updating pipeline INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Excluding no splits because exclude_splits is not set. Pipeline "my_pipeline" updated successfully. CLI Creating a run for pipeline: my_pipeline INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Using deployment config: executor_specs { key: "CsvExampleGen" value { beam_executable_spec { python_executor_spec { class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor" } } } } executor_specs { key: "SchemaGen" value { python_class_executable_spec { class_path: "tfx.components.schema_gen.executor.Executor" } } } executor_specs { key: "StatisticsGen" value { beam_executable_spec { python_executor_spec { class_path: "tfx.components.statistics_gen.executor.Executor" } } } } custom_driver_specs { key: "CsvExampleGen" value { python_class_executable_spec { class_path: "tfx.components.example_gen.driver.FileBasedDriver" } } } metadata_connection_config { database_connection_config { sqlite { filename_uri: "./tfx_metadata/my_pipeline/metadata.db" connection_mode: READWRITE_OPENCREATE } } } INFO:absl:Using connection config: sqlite { filename_uri: "./tfx_metadata/my_pipeline/metadata.db" connection_mode: READWRITE_OPENCREATE } INFO:absl:Component CsvExampleGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen" } id: "CsvExampleGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:24.358660" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } } outputs { outputs { key: "examples" value { artifact_spec { type { name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET } } } } } parameters { parameters { key: "input_base" value { field_value { string_value: "/home/kbuilder/imported/my_pipeline/data" } } } parameters { key: "input_config" value { field_value { string_value: "{\n \"splits\": [\n {\n \"name\": \"single_split\",\n \"pattern\": \"*\"\n }\n ]\n}" } } } parameters { key: "output_config" value { field_value { string_value: "{\n \"split_config\": {\n \"splits\": [\n {\n \"hash_buckets\": 2,\n \"name\": \"train\"\n },\n {\n \"hash_buckets\": 1,\n \"name\": \"eval\"\n }\n ]\n }\n}" } } } parameters { key: "output_data_format" value { field_value { int_value: 6 } } } parameters { key: "output_file_format" value { field_value { int_value: 5 } } } } downstream_nodes: "StatisticsGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized INFO:absl:select span and version = (0, None) INFO:absl:latest span and version = (0, None) INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 10 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=10, input_dict={}, output_dict=defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/10" custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:24.358660:CsvExampleGen:examples:0" } } custom_properties { key: "span" value { int_value: 0 } } , artifact_type: name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}), exec_properties={'output_file_format': 5, 'output_config': '{\n "split_config": {\n "splits": [\n {\n "hash_buckets": 2,\n "name": "train"\n },\n {\n "hash_buckets": 1,\n "name": "eval"\n }\n ]\n }\n}', 'output_data_format': 6, 'input_base': '/home/kbuilder/imported/my_pipeline/data', 'input_config': '{\n "splits": [\n {\n "name": "single_split",\n "pattern": "*"\n }\n ]\n}', 'span': 0, 'version': None, 'input_fingerprint': 'split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/10/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/stateful_working_dir/2022-02-03T11:10:24.358660', tmp_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/10/.temp/', pipeline_node=node_info { type { name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen" } id: "CsvExampleGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:24.358660" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } } outputs { outputs { key: "examples" value { artifact_spec { type { name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET } } } } } parameters { parameters { key: "input_base" value { field_value { string_value: "/home/kbuilder/imported/my_pipeline/data" } } } parameters { key: "input_config" value { field_value { string_value: "{\n \"splits\": [\n {\n \"name\": \"single_split\",\n \"pattern\": \"*\"\n }\n ]\n}" } } } parameters { key: "output_config" value { field_value { string_value: "{\n \"split_config\": {\n \"splits\": [\n {\n \"hash_buckets\": 2,\n \"name\": \"train\"\n },\n {\n \"hash_buckets\": 1,\n \"name\": \"eval\"\n }\n ]\n }\n}" } } } parameters { key: "output_data_format" value { field_value { int_value: 6 } } } parameters { key: "output_file_format" value { field_value { int_value: 5 } } } } downstream_nodes: "StatisticsGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2022-02-03T11:10:24.358660') INFO:absl:Generating examples. INFO:absl:Processing input csv data /home/kbuilder/imported/my_pipeline/data/* to TFExample. WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter. E0203 11:10:24.894390124 5584 fork_posix.cc:70] Fork support is only compatible with the epoll1 and poll polling strategies WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be. INFO:absl:Examples generated. INFO:absl:Value type <class 'NoneType'> of key version in exec_properties is not supported, going to drop it INFO:absl:Value type <class 'list'> of key _beam_pipeline_args in exec_properties is not supported, going to drop it INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 10 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/10" custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:24.358660:CsvExampleGen:examples:0" } } custom_properties { key: "span" value { int_value: 0 } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } , artifact_type: name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}) for execution 10 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component CsvExampleGen is finished. INFO:absl:Component StatisticsGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.statistics_gen.component.StatisticsGen" base_type: PROCESS } id: "StatisticsGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:24.358660" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } } inputs { inputs { key: "examples" value { channels { producer_node_query { id: "CsvExampleGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:24.358660" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } artifact_query { type { name: "Examples" base_type: DATASET } } output_key: "examples" } min_count: 1 } } } outputs { outputs { key: "statistics" value { artifact_spec { type { name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } } upstream_nodes: "CsvExampleGen" downstream_nodes: "SchemaGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 11 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=11, input_dict={'examples': [Artifact(artifact: id: 10 type_id: 15 uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/10" properties { key: "split_names" value { string_value: "[\"train\", \"eval\"]" } } custom_properties { key: "file_format" value { string_value: "tfrecords_gzip" } } custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:24.358660:CsvExampleGen:examples:0" } } custom_properties { key: "payload_format" value { string_value: "FORMAT_TF_EXAMPLE" } } custom_properties { key: "span" value { int_value: 0 } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } state: LIVE create_time_since_epoch: 1643886625515 last_update_time_since_epoch: 1643886625515 , artifact_type: id: 15 name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}, output_dict=defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/11" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:24.358660:StatisticsGen:statistics:0" } } , artifact_type: name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}), exec_properties={'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/11/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/stateful_working_dir/2022-02-03T11:10:24.358660', tmp_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/11/.temp/', pipeline_node=node_info { type { name: "tfx.components.statistics_gen.component.StatisticsGen" base_type: PROCESS } id: "StatisticsGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:24.358660" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } } inputs { inputs { key: "examples" value { channels { producer_node_query { id: "CsvExampleGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:24.358660" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } artifact_query { type { name: "Examples" base_type: DATASET } } output_key: "examples" } min_count: 1 } } } outputs { outputs { key: "statistics" value { artifact_spec { type { name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } } upstream_nodes: "CsvExampleGen" downstream_nodes: "SchemaGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2022-02-03T11:10:24.358660') INFO:absl:Generating statistics for split train. INFO:absl:Statistics for split train written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/11/Split-train. INFO:absl:Generating statistics for split eval. INFO:absl:Statistics for split eval written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/11/Split-eval. WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter. INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 11 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/11" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:24.358660:StatisticsGen:statistics:0" } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } , artifact_type: name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}) for execution 11 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component StatisticsGen is finished. INFO:absl:Component SchemaGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.schema_gen.component.SchemaGen" base_type: PROCESS } id: "SchemaGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:24.358660" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.SchemaGen" } } } } inputs { inputs { key: "statistics" value { channels { producer_node_query { id: "StatisticsGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:24.358660" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } artifact_query { type { name: "ExampleStatistics" base_type: STATISTICS } } output_key: "statistics" } min_count: 1 } } } outputs { outputs { key: "schema" value { artifact_spec { type { name: "Schema" } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } parameters { key: "infer_feature_shape" value { field_value { int_value: 1 } } } } upstream_nodes: "StatisticsGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 12 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=12, input_dict={'statistics': [Artifact(artifact: id: 11 type_id: 17 uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/11" properties { key: "split_names" value { string_value: "[\"train\", \"eval\"]" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:24.358660:StatisticsGen:statistics:0" } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } state: LIVE create_time_since_epoch: 1643886628941 last_update_time_since_epoch: 1643886628941 , artifact_type: id: 17 name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}, output_dict=defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/12" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:24.358660:SchemaGen:schema:0" } } , artifact_type: name: "Schema" )]}), exec_properties={'infer_feature_shape': 1, 'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/12/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/stateful_working_dir/2022-02-03T11:10:24.358660', tmp_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/12/.temp/', pipeline_node=node_info { type { name: "tfx.components.schema_gen.component.SchemaGen" base_type: PROCESS } id: "SchemaGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:24.358660" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.SchemaGen" } } } } inputs { inputs { key: "statistics" value { channels { producer_node_query { id: "StatisticsGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:24.358660" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } artifact_query { type { name: "ExampleStatistics" base_type: STATISTICS } } output_key: "statistics" } min_count: 1 } } } outputs { outputs { key: "schema" value { artifact_spec { type { name: "Schema" } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } parameters { key: "infer_feature_shape" value { field_value { int_value: 1 } } } } upstream_nodes: "StatisticsGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2022-02-03T11:10:24.358660') INFO:absl:Processing schema from statistics for split train. INFO:absl:Processing schema from statistics for split eval. INFO:absl:Schema written to ./tfx_pipeline_output/my_pipeline/SchemaGen/schema/12/schema.pbtxt. INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 12 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/12" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:24.358660:SchemaGen:schema:0" } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } , artifact_type: name: "Schema" )]}) for execution 12 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component SchemaGen is finished.
Examinar saída do avaliador
Esta etapa requer a extensão de notebook Jupyter do TensorFlow Model Analysis (TFMA). Observe que a versão da extensão do notebook TFMA deve ser idêntica à versão do pacote TFMA python.
O comando a seguir instalará a extensão do notebook TFMA do registro NPM. Pode levar vários minutos para ser concluído.
# Install TFMA notebook extension.
jupyter labextension install tensorflow_model_analysis@{tfma.__version__}
usage: jupyter [-h] [--version] [--config-dir] [--data-dir] [--runtime-dir] [--paths] [--json] [--debug] [subcommand] Jupyter: Interactive Computing positional arguments: subcommand the subcommand to launch optional arguments: -h, --help show this help message and exit --version show the versions of core jupyter packages and exit --config-dir show Jupyter config dir --data-dir show Jupyter data dir --runtime-dir show Jupyter runtime dir --paths show all Jupyter paths. Add --json for machine-readable format. --json output paths as machine-readable json --debug output debug information about paths Available subcommands: bundlerextension console dejavu execute kernel kernelspec migrate nbconvert nbextension notebook qtconsole run serverextension troubleshoot trust Jupyter command `jupyter-labextension` not found.
Se a instalação for concluída, recarregue seu navegador para que a extensão tenha efeito.
with metadata.Metadata(metadata_connection_config) as metadata_handler:
# Search all aritfacts from the previous pipeline run.
artifacts = get_latest_artifacts(metadata_handler.store, PIPELINE_NAME)
model_evaluation_artifacts = find_latest_artifacts_by_type(
metadata_handler.store, artifacts,
standard_artifacts.ModelEvaluation.TYPE_NAME)
if model_evaluation_artifacts:
tfma_result = tfma.load_eval_result(model_evaluation_artifacts[0].uri)
tfma.view.render_slicing_metrics(tfma_result)
Adiciona o componente Pusher ao pipeline.
Se o modelo parece promissor, precisamos publicá-lo. O componente pusher pode publicar o modelo em um local no sistema de arquivos ou nos modelos do GCP AI Platform usando um executor personalizado .
O componente Evaluator
avalia continuamente cada modelo criado do Trainer
e o Pusher
copia o modelo para um local predefinido no sistema de arquivos ou até mesmo para os modelos do Google Cloud AI Platform .
- Em
local_runner.py
, definaSERVING_MODEL_DIR
para um diretório a ser publicado. - No arquivo
pipeline/pipeline.py
, remova o comentário# components.append(pusher)
para adicionar o Pusher ao pipeline.
Você pode atualizar o pipeline e executá-lo novamente.
# Update and run the pipeline.
!tfx pipeline update --engine=local --pipeline_path=local_runner.py \
&& tfx run create --engine=local --pipeline_name={PIPELINE_NAME}
CLI Updating pipeline INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Excluding no splits because exclude_splits is not set. Pipeline "my_pipeline" updated successfully. CLI Creating a run for pipeline: my_pipeline INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Using deployment config: executor_specs { key: "CsvExampleGen" value { beam_executable_spec { python_executor_spec { class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor" } } } } executor_specs { key: "SchemaGen" value { python_class_executable_spec { class_path: "tfx.components.schema_gen.executor.Executor" } } } executor_specs { key: "StatisticsGen" value { beam_executable_spec { python_executor_spec { class_path: "tfx.components.statistics_gen.executor.Executor" } } } } custom_driver_specs { key: "CsvExampleGen" value { python_class_executable_spec { class_path: "tfx.components.example_gen.driver.FileBasedDriver" } } } metadata_connection_config { database_connection_config { sqlite { filename_uri: "./tfx_metadata/my_pipeline/metadata.db" connection_mode: READWRITE_OPENCREATE } } } INFO:absl:Using connection config: sqlite { filename_uri: "./tfx_metadata/my_pipeline/metadata.db" connection_mode: READWRITE_OPENCREATE } INFO:absl:Component CsvExampleGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen" } id: "CsvExampleGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:48.556314" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } } outputs { outputs { key: "examples" value { artifact_spec { type { name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET } } } } } parameters { parameters { key: "input_base" value { field_value { string_value: "/home/kbuilder/imported/my_pipeline/data" } } } parameters { key: "input_config" value { field_value { string_value: "{\n \"splits\": [\n {\n \"name\": \"single_split\",\n \"pattern\": \"*\"\n }\n ]\n}" } } } parameters { key: "output_config" value { field_value { string_value: "{\n \"split_config\": {\n \"splits\": [\n {\n \"hash_buckets\": 2,\n \"name\": \"train\"\n },\n {\n \"hash_buckets\": 1,\n \"name\": \"eval\"\n }\n ]\n }\n}" } } } parameters { key: "output_data_format" value { field_value { int_value: 6 } } } parameters { key: "output_file_format" value { field_value { int_value: 5 } } } } downstream_nodes: "StatisticsGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized INFO:absl:select span and version = (0, None) INFO:absl:latest span and version = (0, None) INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 13 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=13, input_dict={}, output_dict=defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/13" custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:48.556314:CsvExampleGen:examples:0" } } custom_properties { key: "span" value { int_value: 0 } } , artifact_type: name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}), exec_properties={'input_config': '{\n "splits": [\n {\n "name": "single_split",\n "pattern": "*"\n }\n ]\n}', 'output_config': '{\n "split_config": {\n "splits": [\n {\n "hash_buckets": 2,\n "name": "train"\n },\n {\n "hash_buckets": 1,\n "name": "eval"\n }\n ]\n }\n}', 'output_file_format': 5, 'output_data_format': 6, 'input_base': '/home/kbuilder/imported/my_pipeline/data', 'span': 0, 'version': None, 'input_fingerprint': 'split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/13/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/stateful_working_dir/2022-02-03T11:10:48.556314', tmp_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/13/.temp/', pipeline_node=node_info { type { name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen" } id: "CsvExampleGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:48.556314" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } } outputs { outputs { key: "examples" value { artifact_spec { type { name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET } } } } } parameters { parameters { key: "input_base" value { field_value { string_value: "/home/kbuilder/imported/my_pipeline/data" } } } parameters { key: "input_config" value { field_value { string_value: "{\n \"splits\": [\n {\n \"name\": \"single_split\",\n \"pattern\": \"*\"\n }\n ]\n}" } } } parameters { key: "output_config" value { field_value { string_value: "{\n \"split_config\": {\n \"splits\": [\n {\n \"hash_buckets\": 2,\n \"name\": \"train\"\n },\n {\n \"hash_buckets\": 1,\n \"name\": \"eval\"\n }\n ]\n }\n}" } } } parameters { key: "output_data_format" value { field_value { int_value: 6 } } } parameters { key: "output_file_format" value { field_value { int_value: 5 } } } } downstream_nodes: "StatisticsGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2022-02-03T11:10:48.556314') INFO:absl:Generating examples. INFO:absl:Processing input csv data /home/kbuilder/imported/my_pipeline/data/* to TFExample. WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter. E0203 11:10:49.163841363 5734 fork_posix.cc:70] Fork support is only compatible with the epoll1 and poll polling strategies WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be. INFO:absl:Examples generated. INFO:absl:Value type <class 'NoneType'> of key version in exec_properties is not supported, going to drop it INFO:absl:Value type <class 'list'> of key _beam_pipeline_args in exec_properties is not supported, going to drop it INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 13 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/13" custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:48.556314:CsvExampleGen:examples:0" } } custom_properties { key: "span" value { int_value: 0 } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } , artifact_type: name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}) for execution 13 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component CsvExampleGen is finished. INFO:absl:Component StatisticsGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.statistics_gen.component.StatisticsGen" base_type: PROCESS } id: "StatisticsGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:48.556314" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } } inputs { inputs { key: "examples" value { channels { producer_node_query { id: "CsvExampleGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:48.556314" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } artifact_query { type { name: "Examples" base_type: DATASET } } output_key: "examples" } min_count: 1 } } } outputs { outputs { key: "statistics" value { artifact_spec { type { name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } } upstream_nodes: "CsvExampleGen" downstream_nodes: "SchemaGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 14 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=14, input_dict={'examples': [Artifact(artifact: id: 13 type_id: 15 uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/13" properties { key: "split_names" value { string_value: "[\"train\", \"eval\"]" } } custom_properties { key: "file_format" value { string_value: "tfrecords_gzip" } } custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:48.556314:CsvExampleGen:examples:0" } } custom_properties { key: "payload_format" value { string_value: "FORMAT_TF_EXAMPLE" } } custom_properties { key: "span" value { int_value: 0 } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } state: LIVE create_time_since_epoch: 1643886649739 last_update_time_since_epoch: 1643886649739 , artifact_type: id: 15 name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}, output_dict=defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/14" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:48.556314:StatisticsGen:statistics:0" } } , artifact_type: name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}), exec_properties={'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/14/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/stateful_working_dir/2022-02-03T11:10:48.556314', tmp_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/14/.temp/', pipeline_node=node_info { type { name: "tfx.components.statistics_gen.component.StatisticsGen" base_type: PROCESS } id: "StatisticsGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:48.556314" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } } inputs { inputs { key: "examples" value { channels { producer_node_query { id: "CsvExampleGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:48.556314" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } artifact_query { type { name: "Examples" base_type: DATASET } } output_key: "examples" } min_count: 1 } } } outputs { outputs { key: "statistics" value { artifact_spec { type { name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } } upstream_nodes: "CsvExampleGen" downstream_nodes: "SchemaGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2022-02-03T11:10:48.556314') INFO:absl:Generating statistics for split train. INFO:absl:Statistics for split train written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/14/Split-train. INFO:absl:Generating statistics for split eval. INFO:absl:Statistics for split eval written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/14/Split-eval. WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter. INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 14 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/14" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:48.556314:StatisticsGen:statistics:0" } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } , artifact_type: name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}) for execution 14 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component StatisticsGen is finished. INFO:absl:Component SchemaGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.schema_gen.component.SchemaGen" base_type: PROCESS } id: "SchemaGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:48.556314" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.SchemaGen" } } } } inputs { inputs { key: "statistics" value { channels { producer_node_query { id: "StatisticsGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:48.556314" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } artifact_query { type { name: "ExampleStatistics" base_type: STATISTICS } } output_key: "statistics" } min_count: 1 } } } outputs { outputs { key: "schema" value { artifact_spec { type { name: "Schema" } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } parameters { key: "infer_feature_shape" value { field_value { int_value: 1 } } } } upstream_nodes: "StatisticsGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 15 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=15, input_dict={'statistics': [Artifact(artifact: id: 14 type_id: 17 uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/14" properties { key: "split_names" value { string_value: "[\"train\", \"eval\"]" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:48.556314:StatisticsGen:statistics:0" } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } state: LIVE create_time_since_epoch: 1643886653128 last_update_time_since_epoch: 1643886653128 , artifact_type: id: 17 name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}, output_dict=defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/15" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:48.556314:SchemaGen:schema:0" } } , artifact_type: name: "Schema" )]}), exec_properties={'infer_feature_shape': 1, 'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/15/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/stateful_working_dir/2022-02-03T11:10:48.556314', tmp_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/15/.temp/', pipeline_node=node_info { type { name: "tfx.components.schema_gen.component.SchemaGen" base_type: PROCESS } id: "SchemaGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:48.556314" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.SchemaGen" } } } } inputs { inputs { key: "statistics" value { channels { producer_node_query { id: "StatisticsGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:48.556314" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } artifact_query { type { name: "ExampleStatistics" base_type: STATISTICS } } output_key: "statistics" } min_count: 1 } } } outputs { outputs { key: "schema" value { artifact_spec { type { name: "Schema" } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } parameters { key: "infer_feature_shape" value { field_value { int_value: 1 } } } } upstream_nodes: "StatisticsGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2022-02-03T11:10:48.556314') INFO:absl:Processing schema from statistics for split train. INFO:absl:Processing schema from statistics for split eval. INFO:absl:Schema written to ./tfx_pipeline_output/my_pipeline/SchemaGen/schema/15/schema.pbtxt. INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 15 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/15" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:48.556314:SchemaGen:schema:0" } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } , artifact_type: name: "Schema" )]}) for execution 15 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component SchemaGen is finished.
Você deve encontrar seu novo modelo em SERVING_MODEL_DIR
.
Etapa 6. (Opcional) Implante seu pipeline no Kubeflow Pipelines no GCP.
Como mencionado anteriormente, local_runner.py
é bom para fins de depuração ou desenvolvimento, mas não é a melhor solução para cargas de trabalho de produção. Nesta etapa, implantaremos o pipeline no Kubeflow Pipelines no Google Cloud.
Preparação
Precisamos do pacote kfp
python e do programa skaffold
para implantar um pipeline em um cluster Kubeflow Pipelines.
pip install --upgrade -q kfp
# Download skaffold and set it executable.
curl -Lo skaffold https://storage.googleapis.com/skaffold/releases/latest/skaffold-linux-amd64 && chmod +x skaffold
Você precisa mover o binário do skaffold
para o local onde seu shell pode encontrá-lo. Ou você pode especificar o caminho para o tfx
ao executar o binário tfx com --skaffold-cmd
.
# Move skaffold binary into your path
mv skaffold /home/jupyter/.local/bin/
mv: cannot move 'skaffold' to '/home/jupyter/.local/bin/': No such file or directory
Você também precisa de um cluster Kubeflow Pipelines para executar o pipeline. Siga as etapas 1 e 2 no tutorial TFX on Cloud AI Platform Pipelines .
Quando o cluster estiver pronto, abra o painel do pipeline clicando em Open Pipelines Dashboard na página Pipelines
do console do Google Cloud . A URL desta página é ENDPOINT
para solicitar uma execução de pipeline. O valor do endpoint é tudo no URL após https://, até e incluindo googleusercontent.com. Coloque seu endpoint no seguinte bloco de código.
ENDPOINT='' # Enter your ENDPOINT here.
Para executar nosso código em um cluster do Kubeflow Pipelines, precisamos empacotar nosso código em uma imagem de contêiner. A imagem será criada automaticamente durante a implantação de nosso pipeline, e você só precisa definir um nome e um registro de contêiner para sua imagem. Em nosso exemplo, usaremos o registro de contêiner do Google e o tfx-pipeline
.
# Read GCP project id from env.
shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
GOOGLE_CLOUD_PROJECT=shell_output[0]
# Docker image name for the pipeline image.
CUSTOM_TFX_IMAGE='gcr.io/' + GOOGLE_CLOUD_PROJECT + '/tfx-pipeline'
Defina a localização dos dados.
Seus dados devem estar acessíveis no cluster Kubeflow Pipelines. Se você usou dados em seu ambiente local, talvez seja necessário fazer upload deles para um armazenamento remoto, como o Google Cloud Storage. Por exemplo, podemos fazer upload de dados de pinguim para um bucket padrão que é criado automaticamente quando um cluster Kubeflow Pipelines é implantado como a seguir.
gsutil cp data/data.csv gs://{GOOGLE_CLOUD_PROJECT}-kubeflowpipelines-default/tfx-template/data/penguin/
Copying file://data/data.csv [Content-Type=text/csv]... NotFoundException: 404 The destination bucket gs://tf-benchmark-dashboard-kubeflowpipelines-default does not exist or the write to the destination must be restarted
Atualize o local de dados armazenado em DATA_PATH
em kubeflow_runner.py
.
Se você estiver usando BigQueryExampleGen, não há necessidade de fazer upload do arquivo de dados, mas certifique-se de que kubeflow_runner.py
use a mesma query
e argumento beam_pipeline_args
para a função pipeline.create_pipeline()
.
Implante o pipeline.
Se tudo estiver pronto, você poderá criar um pipeline usando o comando tfx pipeline create
.
!tfx pipeline create \
--engine=kubeflow \
--pipeline-path=kubeflow_runner.py \
--endpoint={ENDPOINT} \
--build-target-image={CUSTOM_TFX_IMAGE}
CLI [Error] --build-target-image flag was DELETED. You should specify the build target image at the `KubeflowDagRunnerConfig` class instead, and use --build-image flag without argument to build a container image when creating or updating a pipeline.
Agora inicie uma execução com o pipeline recém-criado usando o comando tfx run create
.
tfx run create --engine=kubeflow --pipeline-name={PIPELINE_NAME} --endpoint={ENDPOINT}
CLI Creating a run for pipeline: my_pipeline Failed to load kube config. Traceback (most recent call last): File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connection.py", line 175, in _new_conn (self._dns_host, self.port), self.timeout, **extra_kw File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/util/connection.py", line 95, in create_connection raise err File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/util/connection.py", line 85, in create_connection sock.connect(sa) ConnectionRefusedError: [Errno 111] Connection refused During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connectionpool.py", line 710, in urlopen chunked=chunked, File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connectionpool.py", line 398, in _make_request conn.request(method, url, **httplib_request_kw) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connection.py", line 239, in request super(HTTPConnection, self).request(method, url, body=body, headers=headers) File "/usr/lib/python3.7/http/client.py", line 1256, in request self._send_request(method, url, body, headers, encode_chunked) File "/usr/lib/python3.7/http/client.py", line 1302, in _send_request self.endheaders(body, encode_chunked=encode_chunked) File "/usr/lib/python3.7/http/client.py", line 1251, in endheaders self._send_output(message_body, encode_chunked=encode_chunked) File "/usr/lib/python3.7/http/client.py", line 1030, in _send_output self.send(msg) File "/usr/lib/python3.7/http/client.py", line 970, in send self.connect() File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connection.py", line 205, in connect conn = self._new_conn() File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connection.py", line 187, in _new_conn self, "Failed to establish a new connection: %s" % e urllib3.exceptions.NewConnectionError: <urllib3.connection.HTTPConnection object at 0x7ff729e34190>: Failed to establish a new connection: [Errno 111] Connection refused During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/tmpfs/src/tf_docs_env/bin/tfx", line 8, in <module> sys.exit(cli_group()) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 829, in __call__ return self.main(*args, **kwargs) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 782, in main rv = self.invoke(ctx) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 1259, in invoke return _process_result(sub_ctx.command.invoke(sub_ctx)) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 1259, in invoke return _process_result(sub_ctx.command.invoke(sub_ctx)) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 1066, in invoke return ctx.invoke(self.callback, **ctx.params) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 610, in invoke return callback(*args, **kwargs) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/decorators.py", line 73, in new_func return ctx.invoke(f, obj, *args, **kwargs) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 610, in invoke return callback(*args, **kwargs) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tfx/tools/cli/commands/run.py", line 94, in create_run handler = handler_factory.create_handler(ctx.flags_dict) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tfx/tools/cli/handler/handler_factory.py", line 93, in create_handler return kubeflow_handler.KubeflowHandler(flags_dict) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tfx/tools/cli/handler/kubeflow_handler.py", line 62, in __init__ namespace=self.flags_dict[labels.NAMESPACE]) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp/_client.py", line 197, in __init__ if not self._context_setting['namespace'] and self.get_kfp_healthz( File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp/_client.py", line 411, in get_kfp_healthz response = self._healthz_api.get_healthz() File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp_server_api/api/healthz_service_api.py", line 63, in get_healthz return self.get_healthz_with_http_info(**kwargs) # noqa: E501 File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp_server_api/api/healthz_service_api.py", line 148, in get_healthz_with_http_info collection_formats=collection_formats) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp_server_api/api_client.py", line 369, in call_api _preload_content, _request_timeout, _host) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp_server_api/api_client.py", line 185, in __call_api _request_timeout=_request_timeout) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp_server_api/api_client.py", line 393, in request headers=headers) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp_server_api/rest.py", line 234, in GET query_params=query_params) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp_server_api/rest.py", line 212, in request headers=headers) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/request.py", line 75, in request method, url, fields=fields, headers=headers, **urlopen_kw File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/request.py", line 96, in request_encode_url return self.urlopen(method, url, **extra_kw) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/poolmanager.py", line 375, in urlopen response = conn.urlopen(method, u.request_uri, **kw) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connectionpool.py", line 826, in urlopen **response_kw File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connectionpool.py", line 826, in urlopen **response_kw File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connectionpool.py", line 826, in urlopen **response_kw File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connectionpool.py", line 786, in urlopen method, url, error=e, _pool=self, _stacktrace=sys.exc_info()[2] File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/util/retry.py", line 592, in increment raise MaxRetryError(_pool, url, error or ResponseError(cause)) urllib3.exceptions.MaxRetryError: HTTPConnectionPool(host='localhost', port=80): Max retries exceeded with url: /apis/v1beta1/healthz (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7ff729e34190>: Failed to establish a new connection: [Errno 111] Connection refused'))
Ou você também pode executar o pipeline no painel Kubeflow Pipelines. A nova execução será listada em Experiments
no painel do Kubeflow Pipelines. Clicar no experimento permitirá monitorar o progresso e visualizar os artefatos criados durante a execução.
Se você estiver interessado em executar seu pipeline no Kubeflow Pipelines, encontre mais instruções no tutorial TFX on Cloud AI Platform Pipelines .
Limpando
Para limpar todos os recursos do Google Cloud usados nesta etapa, você pode excluir o projeto do Google Cloud usado no tutorial.
Como alternativa, você pode limpar recursos individuais visitando cada console: