Introducción
Este documento proporcionará instrucciones para crear una canalización TensorFlow Extended (TFX) para su propio conjunto de datos utilizando la plantilla de pingüino que se proporciona con el paquete TFX Python. La canalización creada utilizará inicialmente el conjunto de datos de Palmer Penguins , pero transformaremos la canalización para su conjunto de datos.
requisitos previos
- Linux/Mac OS
- Pitón 3.6-3.8
- cuaderno jupyter
Paso 1. Copie la plantilla predefinida en el directorio de su proyecto.
En este paso, crearemos un directorio y archivos de proyecto de tubería en funcionamiento copiando archivos de la plantilla de pingüino en TFX. Puede pensar en esto como un andamio para su proyecto de tubería TFX.
Actualizar Pip
Si estamos ejecutando en Colab, debemos asegurarnos de tener la última versión de Pip. Por supuesto, los sistemas locales se pueden actualizar por separado.
import sys
if 'google.colab' in sys.modules:
!pip install --upgrade pip
Instalar el paquete requerido
Primero, instale TFX y TensorFlow Model Analysis (TFMA).
pip install -U tfx tensorflow-model-analysis
Comprobemos las versiones de TFX.
import tensorflow as tf
import tensorflow_model_analysis as tfma
import tfx
print('TF version: {}'.format(tf.__version__))
print('TFMA version: {}'.format(tfma.__version__))
print('TFX version: {}'.format(tfx.__version__))
TF version: 2.7.1 TFMA version: 0.37.0 TFX version: 1.6.0
Estamos listos para crear una canalización.
Establezca PROJECT_DIR
en el destino adecuado para su entorno. El valor predeterminado es ~/imported/${PIPELINE_NAME}
, que es apropiado para el entorno de Google Cloud AI Platform Notebook .
Puede darle a su tubería un nombre diferente cambiando el PIPELINE_NAME
a continuación. Este también se convertirá en el nombre del directorio del proyecto donde se colocarán sus archivos.
PIPELINE_NAME="my_pipeline"
import os
# Set this project directory to your new tfx pipeline project.
PROJECT_DIR=os.path.join(os.path.expanduser("~"), "imported", PIPELINE_NAME)
Copie archivos de plantilla.
TFX incluye la plantilla de penguin
con el paquete TFX python. La plantilla de penguin
contiene muchas instrucciones para llevar su conjunto de datos a la canalización, que es el propósito de este tutorial.
El comando CLI tfx template copy
copia los archivos de plantilla predefinidos en el directorio de su proyecto.
# Set `PATH` to include user python binary directory and a directory containing `skaffold`.
PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin
!tfx template copy \
--pipeline-name={PIPELINE_NAME} \
--destination-path={PROJECT_DIR} \
--model=penguin
env: PATH=/tmpfs/src/tf_docs_env/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/snap/bin:/opt/puppetlabs/bin:/opt/android-studio/current/bin:/usr/local/go/bin:/usr/local/go/packages/bin:/opt/kubernetes/client/bin/:/home/kbuilder/.local/bin:/home/jupyter/.local/bin CLI Copying penguin pipeline template kubeflow_runner.py -> /home/kbuilder/imported/my_pipeline/kubeflow_runner.py configs.py -> /home/kbuilder/imported/my_pipeline/pipeline/configs.py pipeline.py -> /home/kbuilder/imported/my_pipeline/pipeline/pipeline.py __init__.py -> /home/kbuilder/imported/my_pipeline/pipeline/__init__.py model.py -> /home/kbuilder/imported/my_pipeline/models/model.py features.py -> /home/kbuilder/imported/my_pipeline/models/features.py features_test.py -> /home/kbuilder/imported/my_pipeline/models/features_test.py preprocessing_test.py -> /home/kbuilder/imported/my_pipeline/models/preprocessing_test.py preprocessing.py -> /home/kbuilder/imported/my_pipeline/models/preprocessing.py model_test.py -> /home/kbuilder/imported/my_pipeline/models/model_test.py __init__.py -> /home/kbuilder/imported/my_pipeline/models/__init__.py constants.py -> /home/kbuilder/imported/my_pipeline/models/constants.py local_runner.py -> /home/kbuilder/imported/my_pipeline/local_runner.py __init__.py -> /home/kbuilder/imported/my_pipeline/__init__.py
Cambie el contexto del directorio de trabajo en este cuaderno al directorio del proyecto.
%cd {PROJECT_DIR}
/home/kbuilder/imported/my_pipeline
Explore sus archivos fuente copiados
La plantilla TFX proporciona archivos de andamiaje básicos para crear una canalización, incluido el código fuente de Python y datos de muestra. La plantilla de penguin
utiliza el mismo conjunto de datos y modelo de aprendizaje automático de Palmer Penguins que el ejemplo de Penguin .
Aquí hay una breve introducción a cada uno de los archivos de Python.
-
pipeline
: este directorio contiene la definición de la tubería-
configs.py
: define constantes comunes para ejecutores de canalización -
pipeline.py
— define los componentes TFX y una canalización
-
-
models
: este directorio contiene definiciones de modelos de ML-
features.py
,features_test.py
— define características para el modelo -
preprocessing.py
,preprocessing_test.py
— define rutinas de preprocesamiento para datos -
constants.py
— define las constantes del modelo -
model.py
,model_test.py
: define el modelo ML utilizando marcos ML como TensorFlow
-
-
local_runner.py
: define un corredor para el entorno local que utiliza un motor de orquestación local -
kubeflow_runner.py
: define un corredor para el motor de orquestación de Kubeflow Pipelines
De forma predeterminada, la plantilla solo incluye componentes TFX estándar. Si necesita algunas acciones personalizadas, puede crear componentes personalizados para su canalización. Consulte la guía de componentes personalizados de TFX para obtener más detalles.
Archivos de prueba unitaria.
Puede notar que hay algunos archivos con _test.py
en su nombre. Estas son pruebas unitarias de la canalización y se recomienda agregar más pruebas unitarias a medida que implementa sus propias canalizaciones. Puede ejecutar pruebas unitarias proporcionando el nombre del módulo de los archivos de prueba con el indicador -m
. Por lo general, puede obtener un nombre de módulo eliminando la extensión .py
y reemplazando /
con .
. Por ejemplo:
import sys
!{sys.executable} -m models.features_test
Running tests under Python 3.7.5: /tmpfs/src/tf_docs_env/bin/python [ RUN ] FeaturesTest.testLabelKey INFO:tensorflow:time(__main__.FeaturesTest.testLabelKey): 0.0s I0203 11:08:46.306882 140258321348416 test_util.py:2309] time(__main__.FeaturesTest.testLabelKey): 0.0s [ OK ] FeaturesTest.testLabelKey [ RUN ] FeaturesTest.test_session [ SKIPPED ] FeaturesTest.test_session ---------------------------------------------------------------------- Ran 2 tests in 0.001s OK (skipped=1)
Cree una canalización TFX en un entorno local.
TFX admite varios motores de orquestación para ejecutar canalizaciones. Usaremos el motor de orquestación local. El motor de orquestación local se ejecuta sin más dependencias y es adecuado para el desarrollo y la depuración porque se ejecuta en un entorno local en lugar de depender de clústeres informáticos remotos.
Usaremos local_runner.py
para ejecutar su canalización usando el orquestador local. Tienes que crear una canalización antes de ejecutarla. Puede crear una canalización con el comando de pipeline create
de canalización.
tfx pipeline create --engine=local --pipeline_path=local_runner.py
CLI Creating pipeline INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Excluding no splits because exclude_splits is not set. Pipeline "my_pipeline" created successfully.
El comando de pipeline create
registra su canalización definida en local_runner.py
sin ejecutarla realmente.
Ejecutará la canalización creada con el comando run create
en los siguientes pasos.
Paso 2. Ingiera SUS datos en la canalización.
La canalización inicial ingiere el conjunto de datos de pingüinos que se incluye en la plantilla. Debe poner sus datos en la canalización, y la mayoría de las canalizaciones TFX comienzan con el componente ExampleGen.
Elija un ExampleGen
Sus datos se pueden almacenar en cualquier lugar al que pueda acceder su canalización, ya sea en un sistema de archivos local o distribuido, o en un sistema con capacidad de consulta. TFX proporciona varios componentes de ExampleGen
para llevar sus datos a una canalización de TFX. Puede elegir uno de los siguientes ejemplos de generación de componentes.
- CsvExampleGen: lee archivos CSV en un directorio. Usado en el ejemplo del pingüino y en el ejemplo del taxi de Chicago .
- ImportExampleGen: toma archivos TFRecord con formato de datos de ejemplo TF. Usado en ejemplos MNIST .
- FileBasedExampleGen para formato Avro o Parquet .
- BigQueryExampleGen : lee datos en Google Cloud BigQuery directamente. Usado en ejemplos de taxis de Chicago .
También puede crear su propio ExampleGen, por ejemplo, tfx incluye un ExecampleGen personalizado que utiliza Presto como fuente de datos. Consulte la guía para obtener más información sobre cómo usar y desarrollar ejecutores personalizados.
Una vez que decida qué ExampleGen usar, deberá modificar la definición de canalización para usar sus datos.
Modifique
DATA_PATH
enlocal_runner.py
y configúrelo en la ubicación de sus archivos.- Si tiene archivos en el entorno local, especifique la ruta. Esta es la mejor opción para desarrollar o depurar una canalización.
- Si los archivos están almacenados en GCS, puede usar una ruta que comience con
gs://{bucket_name}/...
. Asegúrese de que puede acceder a GCS desde su terminal, por ejemplo, usandogsutil
. Siga la guía de autorización en Google Cloud si es necesario. - Si quiere usar un ExampleGen basado en Query como BigQueryExampleGen, necesita una instrucción Query para seleccionar datos de la fuente de datos. Hay algunas cosas más que debe configurar para usar Google Cloud BigQuery como fuente de datos.
- En
pipeline/configs.py
:- Cambia
GOOGLE_CLOUD_PROJECT
yGCS_BUCKET_NAME
por el nombre de tu proyecto y depósito de GCP. El depósito debe existir antes de que ejecutemos la canalización. - Descomente la variable
BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS
. - Descomente y establezca la variable
BIG_QUERY_QUERY
en su declaración de consulta .
- Cambia
- En
local_runner.py
:- Comente el argumento
data_path
y elimine el comentario del argumento dequery
enpipeline.create_pipeline()
.
- Comente el argumento
- En
pipeline/pipeline.py
:- Comente el argumento
data_path
y descomente el argumento dequery
encreate_pipeline()
. - Usa BigQueryExampleGen en lugar de CsvExampleGen.
- Comente el argumento
Reemplaza el CsvExampleGen existente por tu clase ExampleGen en
pipeline/pipeline.py
. Cada clase ExampleGen tiene una firma diferente. Consulte la guía del componente ExampleGen para obtener más detalles. No olvides importar los módulos requeridos con declaraciones deimport
enpipeline/pipeline.py
.
La canalización inicial consta de cuatro componentes, ExampleGen
, StatisticsGen
, SchemaGen
y ExampleValidator
. No necesitamos cambiar nada para StatisticsGen
, SchemaGen
y ExampleValidator
. Ejecutemos la canalización por primera vez.
# Update and run the pipeline.
!tfx pipeline update --engine=local --pipeline_path=local_runner.py \
&& tfx run create --engine=local --pipeline_name={PIPELINE_NAME}
CLI Updating pipeline INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Excluding no splits because exclude_splits is not set. Pipeline "my_pipeline" updated successfully. CLI Creating a run for pipeline: my_pipeline INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Using deployment config: executor_specs { key: "CsvExampleGen" value { beam_executable_spec { python_executor_spec { class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor" } } } } executor_specs { key: "SchemaGen" value { python_class_executable_spec { class_path: "tfx.components.schema_gen.executor.Executor" } } } executor_specs { key: "StatisticsGen" value { beam_executable_spec { python_executor_spec { class_path: "tfx.components.statistics_gen.executor.Executor" } } } } custom_driver_specs { key: "CsvExampleGen" value { python_class_executable_spec { class_path: "tfx.components.example_gen.driver.FileBasedDriver" } } } metadata_connection_config { database_connection_config { sqlite { filename_uri: "./tfx_metadata/my_pipeline/metadata.db" connection_mode: READWRITE_OPENCREATE } } } INFO:absl:Using connection config: sqlite { filename_uri: "./tfx_metadata/my_pipeline/metadata.db" connection_mode: READWRITE_OPENCREATE } INFO:absl:Component CsvExampleGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen" } id: "CsvExampleGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:12.120566" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } } outputs { outputs { key: "examples" value { artifact_spec { type { name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET } } } } } parameters { parameters { key: "input_base" value { field_value { string_value: "/home/kbuilder/imported/my_pipeline/data" } } } parameters { key: "input_config" value { field_value { string_value: "{\n \"splits\": [\n {\n \"name\": \"single_split\",\n \"pattern\": \"*\"\n }\n ]\n}" } } } parameters { key: "output_config" value { field_value { string_value: "{\n \"split_config\": {\n \"splits\": [\n {\n \"hash_buckets\": 2,\n \"name\": \"train\"\n },\n {\n \"hash_buckets\": 1,\n \"name\": \"eval\"\n }\n ]\n }\n}" } } } parameters { key: "output_data_format" value { field_value { int_value: 6 } } } parameters { key: "output_file_format" value { field_value { int_value: 5 } } } } downstream_nodes: "StatisticsGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized INFO:absl:select span and version = (0, None) INFO:absl:latest span and version = (0, None) INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 1 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=1, input_dict={}, output_dict=defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/1" custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:09:12.120566:CsvExampleGen:examples:0" } } custom_properties { key: "span" value { int_value: 0 } } , artifact_type: name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}), exec_properties={'input_config': '{\n "splits": [\n {\n "name": "single_split",\n "pattern": "*"\n }\n ]\n}', 'output_data_format': 6, 'output_file_format': 5, 'output_config': '{\n "split_config": {\n "splits": [\n {\n "hash_buckets": 2,\n "name": "train"\n },\n {\n "hash_buckets": 1,\n "name": "eval"\n }\n ]\n }\n}', 'input_base': '/home/kbuilder/imported/my_pipeline/data', 'span': 0, 'version': None, 'input_fingerprint': 'split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/1/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/stateful_working_dir/2022-02-03T11:09:12.120566', tmp_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/1/.temp/', pipeline_node=node_info { type { name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen" } id: "CsvExampleGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:12.120566" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } } outputs { outputs { key: "examples" value { artifact_spec { type { name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET } } } } } parameters { parameters { key: "input_base" value { field_value { string_value: "/home/kbuilder/imported/my_pipeline/data" } } } parameters { key: "input_config" value { field_value { string_value: "{\n \"splits\": [\n {\n \"name\": \"single_split\",\n \"pattern\": \"*\"\n }\n ]\n}" } } } parameters { key: "output_config" value { field_value { string_value: "{\n \"split_config\": {\n \"splits\": [\n {\n \"hash_buckets\": 2,\n \"name\": \"train\"\n },\n {\n \"hash_buckets\": 1,\n \"name\": \"eval\"\n }\n ]\n }\n}" } } } parameters { key: "output_data_format" value { field_value { int_value: 6 } } } parameters { key: "output_file_format" value { field_value { int_value: 5 } } } } downstream_nodes: "StatisticsGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2022-02-03T11:09:12.120566') INFO:absl:Generating examples. INFO:absl:Processing input csv data /home/kbuilder/imported/my_pipeline/data/* to TFExample. WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter. E0203 11:09:12.848598153 5127 fork_posix.cc:70] Fork support is only compatible with the epoll1 and poll polling strategies WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be. INFO:absl:Examples generated. INFO:absl:Value type <class 'NoneType'> of key version in exec_properties is not supported, going to drop it INFO:absl:Value type <class 'list'> of key _beam_pipeline_args in exec_properties is not supported, going to drop it INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 1 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/1" custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:09:12.120566:CsvExampleGen:examples:0" } } custom_properties { key: "span" value { int_value: 0 } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } , artifact_type: name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}) for execution 1 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component CsvExampleGen is finished. INFO:absl:Component StatisticsGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.statistics_gen.component.StatisticsGen" base_type: PROCESS } id: "StatisticsGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:12.120566" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } } inputs { inputs { key: "examples" value { channels { producer_node_query { id: "CsvExampleGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:12.120566" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } artifact_query { type { name: "Examples" base_type: DATASET } } output_key: "examples" } min_count: 1 } } } outputs { outputs { key: "statistics" value { artifact_spec { type { name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } } upstream_nodes: "CsvExampleGen" downstream_nodes: "SchemaGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 2 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=2, input_dict={'examples': [Artifact(artifact: id: 1 type_id: 15 uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/1" properties { key: "split_names" value { string_value: "[\"train\", \"eval\"]" } } custom_properties { key: "file_format" value { string_value: "tfrecords_gzip" } } custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:09:12.120566:CsvExampleGen:examples:0" } } custom_properties { key: "payload_format" value { string_value: "FORMAT_TF_EXAMPLE" } } custom_properties { key: "span" value { int_value: 0 } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } state: LIVE create_time_since_epoch: 1643886553302 last_update_time_since_epoch: 1643886553302 , artifact_type: id: 15 name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}, output_dict=defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/2" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:09:12.120566:StatisticsGen:statistics:0" } } , artifact_type: name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}), exec_properties={'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/2/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/stateful_working_dir/2022-02-03T11:09:12.120566', tmp_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/2/.temp/', pipeline_node=node_info { type { name: "tfx.components.statistics_gen.component.StatisticsGen" base_type: PROCESS } id: "StatisticsGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:12.120566" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } } inputs { inputs { key: "examples" value { channels { producer_node_query { id: "CsvExampleGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:12.120566" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } artifact_query { type { name: "Examples" base_type: DATASET } } output_key: "examples" } min_count: 1 } } } outputs { outputs { key: "statistics" value { artifact_spec { type { name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } } upstream_nodes: "CsvExampleGen" downstream_nodes: "SchemaGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2022-02-03T11:09:12.120566') INFO:absl:Generating statistics for split train. INFO:absl:Statistics for split train written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/2/Split-train. INFO:absl:Generating statistics for split eval. INFO:absl:Statistics for split eval written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/2/Split-eval. WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter. INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 2 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/2" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:09:12.120566:StatisticsGen:statistics:0" } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } , artifact_type: name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}) for execution 2 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component StatisticsGen is finished. INFO:absl:Component SchemaGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.schema_gen.component.SchemaGen" base_type: PROCESS } id: "SchemaGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:12.120566" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.SchemaGen" } } } } inputs { inputs { key: "statistics" value { channels { producer_node_query { id: "StatisticsGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:12.120566" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } artifact_query { type { name: "ExampleStatistics" base_type: STATISTICS } } output_key: "statistics" } min_count: 1 } } } outputs { outputs { key: "schema" value { artifact_spec { type { name: "Schema" } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } parameters { key: "infer_feature_shape" value { field_value { int_value: 1 } } } } upstream_nodes: "StatisticsGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 3 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=3, input_dict={'statistics': [Artifact(artifact: id: 2 type_id: 17 uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/2" properties { key: "split_names" value { string_value: "[\"train\", \"eval\"]" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:09:12.120566:StatisticsGen:statistics:0" } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } state: LIVE create_time_since_epoch: 1643886556588 last_update_time_since_epoch: 1643886556588 , artifact_type: id: 17 name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}, output_dict=defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/3" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:09:12.120566:SchemaGen:schema:0" } } , artifact_type: name: "Schema" )]}), exec_properties={'infer_feature_shape': 1, 'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/3/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/stateful_working_dir/2022-02-03T11:09:12.120566', tmp_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/3/.temp/', pipeline_node=node_info { type { name: "tfx.components.schema_gen.component.SchemaGen" base_type: PROCESS } id: "SchemaGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:12.120566" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.SchemaGen" } } } } inputs { inputs { key: "statistics" value { channels { producer_node_query { id: "StatisticsGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:12.120566" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } artifact_query { type { name: "ExampleStatistics" base_type: STATISTICS } } output_key: "statistics" } min_count: 1 } } } outputs { outputs { key: "schema" value { artifact_spec { type { name: "Schema" } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } parameters { key: "infer_feature_shape" value { field_value { int_value: 1 } } } } upstream_nodes: "StatisticsGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2022-02-03T11:09:12.120566') INFO:absl:Processing schema from statistics for split train. INFO:absl:Processing schema from statistics for split eval. INFO:absl:Schema written to ./tfx_pipeline_output/my_pipeline/SchemaGen/schema/3/schema.pbtxt. INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 3 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/3" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:09:12.120566:SchemaGen:schema:0" } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } , artifact_type: name: "Schema" )]}) for execution 3 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component SchemaGen is finished.
Debería ver "El componente ExampleValidator está terminado". si la canalización se ejecutó correctamente.
Examinar la salida de la tubería.
La canalización TFX produce dos tipos de salida, artefactos y una base de datos de metadatos (MLMD) que contiene metadatos de artefactos y ejecuciones de canalización. La ubicación de la salida se define en local_runner.py
. De forma predeterminada, los artefactos se almacenan en el directorio tfx_pipeline_output
y los metadatos se almacenan como una base de datos sqlite en el directorio tfx_metadata
.
Puede usar las API de MLMD para examinar estos resultados. Primero, definiremos algunas funciones de utilidad para buscar artefactos de salida que se acaban de producir.
import tensorflow as tf
import tfx
from ml_metadata import errors
from ml_metadata.proto import metadata_store_pb2
from tfx.types import artifact_utils
# TODO(b/171447278): Move these functions into TFX library.
def get_latest_executions(store, pipeline_name, component_id = None):
"""Fetch all pipeline runs."""
if component_id is None: # Find entire pipeline runs.
run_contexts = [
c for c in store.get_contexts_by_type('run')
if c.properties['pipeline_name'].string_value == pipeline_name
]
else: # Find specific component runs.
run_contexts = [
c for c in store.get_contexts_by_type('component_run')
if c.properties['pipeline_name'].string_value == pipeline_name and
c.properties['component_id'].string_value == component_id
]
if not run_contexts:
return []
# Pick the latest run context.
latest_context = max(run_contexts,
key=lambda c: c.last_update_time_since_epoch)
return store.get_executions_by_context(latest_context.id)
def get_latest_artifacts(store, pipeline_name, component_id = None):
"""Fetch all artifacts from latest pipeline execution."""
executions = get_latest_executions(store, pipeline_name, component_id)
# Fetch all artifacts produced from the given executions.
execution_ids = [e.id for e in executions]
events = store.get_events_by_execution_ids(execution_ids)
artifact_ids = [
event.artifact_id for event in events
if event.type == metadata_store_pb2.Event.OUTPUT
]
return store.get_artifacts_by_id(artifact_ids)
def find_latest_artifacts_by_type(store, artifacts, artifact_type):
"""Get the latest artifacts of a specified type."""
# Get type information from MLMD
try:
artifact_type = store.get_artifact_type(artifact_type)
except errors.NotFoundError:
return []
# Filter artifacts with type.
filtered_artifacts = [aritfact for aritfact in artifacts
if aritfact.type_id == artifact_type.id]
# Convert MLMD artifact data into TFX Artifact instances.
return [artifact_utils.deserialize_artifact(artifact_type, artifact)
for artifact in filtered_artifacts]
from tfx.orchestration.experimental.interactive import visualizations
def visualize_artifacts(artifacts):
"""Visualizes artifacts using standard visualization modules."""
for artifact in artifacts:
visualization = visualizations.get_registry().get_visualization(
artifact.type_name)
if visualization:
visualization.display(artifact)
from tfx.orchestration.experimental.interactive import standard_visualizations
standard_visualizations.register_standard_visualizations()
import pprint
from tfx.orchestration import metadata
from tfx.types import artifact_utils
from tfx.types import standard_artifacts
def preview_examples(artifacts):
"""Preview a few records from Examples artifacts."""
pp = pprint.PrettyPrinter()
for artifact in artifacts:
print("==== Examples artifact:{}({})".format(artifact.name, artifact.uri))
for split in artifact_utils.decode_split_names(artifact.split_names):
print("==== Reading from split:{}".format(split))
split_uri = artifact_utils.get_split_uri([artifact], split)
# Get the list of files in this directory (all compressed TFRecord files)
tfrecord_filenames = [os.path.join(split_uri, name)
for name in os.listdir(split_uri)]
# Create a `TFRecordDataset` to read these files
dataset = tf.data.TFRecordDataset(tfrecord_filenames,
compression_type="GZIP")
# Iterate over the first 2 records and decode them.
for tfrecord in dataset.take(2):
serialized_example = tfrecord.numpy()
example = tf.train.Example()
example.ParseFromString(serialized_example)
pp.pprint(example)
import local_runner
metadata_connection_config = metadata.sqlite_metadata_connection_config(
local_runner.METADATA_PATH)
Ahora podemos leer metadatos de artefactos de salida de MLMD.
with metadata.Metadata(metadata_connection_config) as metadata_handler:
# Search all aritfacts from the previous pipeline run.
artifacts = get_latest_artifacts(metadata_handler.store, PIPELINE_NAME)
# Find artifacts of Examples type.
examples_artifacts = find_latest_artifacts_by_type(
metadata_handler.store, artifacts,
standard_artifacts.Examples.TYPE_NAME)
# Find artifacts generated from StatisticsGen.
stats_artifacts = find_latest_artifacts_by_type(
metadata_handler.store, artifacts,
standard_artifacts.ExampleStatistics.TYPE_NAME)
# Find artifacts generated from SchemaGen.
schema_artifacts = find_latest_artifacts_by_type(
metadata_handler.store, artifacts,
standard_artifacts.Schema.TYPE_NAME)
# Find artifacts generated from ExampleValidator.
anomalies_artifacts = find_latest_artifacts_by_type(
metadata_handler.store, artifacts,
standard_artifacts.ExampleAnomalies.TYPE_NAME)
Ahora podemos examinar los resultados de cada componente. Tensorflow Data Validation (TFDV) se usa en StatisticsGen
, SchemaGen
y ExampleValidator
, y TFDV se puede usar para visualizar los resultados de estos componentes.
En este tutorial, usaremos métodos auxiliares de visualización en TFX que usan TFDV internamente para mostrar la visualización. Consulte el tutorial de componentes TFX para obtener más información sobre cada componente.
Examinar el formulario de salida ExampleGen
Examinemos la salida de ExampleGen. Eche un vistazo a los primeros dos ejemplos para cada división:
preview_examples(examples_artifacts)
De manera predeterminada, TFX ExampleGen divide los ejemplos en dos divisiones, entrenar y evaluar , pero puede ajustar su configuración de división .
Examinar la salida de StatisticsGen
visualize_artifacts(stats_artifacts)
Estas estadísticas se proporcionan a SchemaGen para construir un esquema de datos automáticamente.
Examinar la salida de SchemaGen
visualize_artifacts(schema_artifacts)
Este esquema se deduce automáticamente de la salida de StatisticsGen. Usaremos este esquema generado en este tutorial, pero también puede modificar y personalizar el esquema .
Examinar la salida de ExampleValidator
visualize_artifacts(anomalies_artifacts)
Si se encontraron anomalías, puede revisar sus datos para que todos los ejemplos sigan sus suposiciones. Los resultados de otros componentes como StatistcsGen pueden ser útiles. Las anomalías encontradas no bloquean la ejecución de la canalización.
Puede ver las funciones disponibles en los resultados de SchemaGen
. Si sus características se pueden usar para construir un modelo ML en Trainer
directamente, puede omitir el siguiente paso y continuar con el Paso 4. De lo contrario, puede realizar un trabajo de ingeniería de características en el siguiente paso. El componente de Transform
es necesario cuando se requieren operaciones de paso completo como el cálculo de promedios, especialmente cuando necesita escalar.
Paso 3. (Opcional) Ingeniería de características con componente Transformar.
En este paso, definirá varios trabajos de ingeniería de características que utilizará el componente Transform
en la canalización. Consulte la guía de componentes Transformar para obtener más información.
Esto solo es necesario si su código de entrenamiento requiere características adicionales que no están disponibles en la salida de ExampleGen. De lo contrario, siéntase libre de avanzar rápidamente al siguiente paso del uso de Trainer.
Definir características del modelo.
models/features.py
contiene constantes para definir las características del modelo, incluidos los nombres de las características, el tamaño del vocabulario, etc. Por defecto, la plantilla penguin
tiene dos constantes, FEATURE_KEYS
y LABEL_KEY
, porque nuestro modelo penguin
resuelve un problema de clasificación mediante el aprendizaje supervisado y todas las funciones son funciones numéricas continuas. Consulte las definiciones de características del ejemplo del taxi de Chicago para ver otro ejemplo.
Implemente el preprocesamiento para entrenar/servir en preprocessing_fn().
La ingeniería de funciones real ocurre en la función preprocessing_fn()
en models/preprocessing.py
.
En preprocessing_fn
puede definir una serie de funciones que manipulan el dictado de entrada de los tensores para producir el dictado de salida de los tensores. Hay funciones auxiliares como scale_to_0_1
y compute_and_apply_vocabulary
en la API de transformación de TensorFlow o simplemente puede usar las funciones regulares de TensorFlow. De forma predeterminada, la plantilla de penguin
incluye usos de ejemplo de la función tft.scale_to_z_score para normalizar los valores de las características.
Consulte la guía Tensflow Transform para obtener más información sobre cómo crear preprocessing_fn
.
Agregue el componente Transform a la canalización.
Si su preprocessing_fn está listo, agregue el componente Transform
a la canalización.
- En el archivo
pipeline/pipeline.py
, descomente# components.append(transform)
para agregar el componente a la tubería.
Puede actualizar la canalización y volver a ejecutarla.
!tfx pipeline update --engine=local --pipeline_path=local_runner.py \
&& tfx run create --engine=local --pipeline_name={PIPELINE_NAME}
CLI Updating pipeline INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Excluding no splits because exclude_splits is not set. Pipeline "my_pipeline" updated successfully. CLI Creating a run for pipeline: my_pipeline INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Using deployment config: executor_specs { key: "CsvExampleGen" value { beam_executable_spec { python_executor_spec { class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor" } } } } executor_specs { key: "SchemaGen" value { python_class_executable_spec { class_path: "tfx.components.schema_gen.executor.Executor" } } } executor_specs { key: "StatisticsGen" value { beam_executable_spec { python_executor_spec { class_path: "tfx.components.statistics_gen.executor.Executor" } } } } custom_driver_specs { key: "CsvExampleGen" value { python_class_executable_spec { class_path: "tfx.components.example_gen.driver.FileBasedDriver" } } } metadata_connection_config { database_connection_config { sqlite { filename_uri: "./tfx_metadata/my_pipeline/metadata.db" connection_mode: READWRITE_OPENCREATE } } } INFO:absl:Using connection config: sqlite { filename_uri: "./tfx_metadata/my_pipeline/metadata.db" connection_mode: READWRITE_OPENCREATE } INFO:absl:Component CsvExampleGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen" } id: "CsvExampleGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:37.055994" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } } outputs { outputs { key: "examples" value { artifact_spec { type { name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET } } } } } parameters { parameters { key: "input_base" value { field_value { string_value: "/home/kbuilder/imported/my_pipeline/data" } } } parameters { key: "input_config" value { field_value { string_value: "{\n \"splits\": [\n {\n \"name\": \"single_split\",\n \"pattern\": \"*\"\n }\n ]\n}" } } } parameters { key: "output_config" value { field_value { string_value: "{\n \"split_config\": {\n \"splits\": [\n {\n \"hash_buckets\": 2,\n \"name\": \"train\"\n },\n {\n \"hash_buckets\": 1,\n \"name\": \"eval\"\n }\n ]\n }\n}" } } } parameters { key: "output_data_format" value { field_value { int_value: 6 } } } parameters { key: "output_file_format" value { field_value { int_value: 5 } } } } downstream_nodes: "StatisticsGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized INFO:absl:select span and version = (0, None) INFO:absl:latest span and version = (0, None) INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 4 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=4, input_dict={}, output_dict=defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/4" custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:09:37.055994:CsvExampleGen:examples:0" } } custom_properties { key: "span" value { int_value: 0 } } , artifact_type: name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}), exec_properties={'output_file_format': 5, 'input_base': '/home/kbuilder/imported/my_pipeline/data', 'output_config': '{\n "split_config": {\n "splits": [\n {\n "hash_buckets": 2,\n "name": "train"\n },\n {\n "hash_buckets": 1,\n "name": "eval"\n }\n ]\n }\n}', 'output_data_format': 6, 'input_config': '{\n "splits": [\n {\n "name": "single_split",\n "pattern": "*"\n }\n ]\n}', 'span': 0, 'version': None, 'input_fingerprint': 'split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/4/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/stateful_working_dir/2022-02-03T11:09:37.055994', tmp_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/4/.temp/', pipeline_node=node_info { type { name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen" } id: "CsvExampleGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:37.055994" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } } outputs { outputs { key: "examples" value { artifact_spec { type { name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET } } } } } parameters { parameters { key: "input_base" value { field_value { string_value: "/home/kbuilder/imported/my_pipeline/data" } } } parameters { key: "input_config" value { field_value { string_value: "{\n \"splits\": [\n {\n \"name\": \"single_split\",\n \"pattern\": \"*\"\n }\n ]\n}" } } } parameters { key: "output_config" value { field_value { string_value: "{\n \"split_config\": {\n \"splits\": [\n {\n \"hash_buckets\": 2,\n \"name\": \"train\"\n },\n {\n \"hash_buckets\": 1,\n \"name\": \"eval\"\n }\n ]\n }\n}" } } } parameters { key: "output_data_format" value { field_value { int_value: 6 } } } parameters { key: "output_file_format" value { field_value { int_value: 5 } } } } downstream_nodes: "StatisticsGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2022-02-03T11:09:37.055994') INFO:absl:Generating examples. INFO:absl:Processing input csv data /home/kbuilder/imported/my_pipeline/data/* to TFExample. WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter. E0203 11:09:37.596944686 5287 fork_posix.cc:70] Fork support is only compatible with the epoll1 and poll polling strategies WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be. INFO:absl:Examples generated. INFO:absl:Value type <class 'NoneType'> of key version in exec_properties is not supported, going to drop it INFO:absl:Value type <class 'list'> of key _beam_pipeline_args in exec_properties is not supported, going to drop it INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 4 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/4" custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:09:37.055994:CsvExampleGen:examples:0" } } custom_properties { key: "span" value { int_value: 0 } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } , artifact_type: name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}) for execution 4 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component CsvExampleGen is finished. INFO:absl:Component StatisticsGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.statistics_gen.component.StatisticsGen" base_type: PROCESS } id: "StatisticsGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:37.055994" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } } inputs { inputs { key: "examples" value { channels { producer_node_query { id: "CsvExampleGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:37.055994" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } artifact_query { type { name: "Examples" base_type: DATASET } } output_key: "examples" } min_count: 1 } } } outputs { outputs { key: "statistics" value { artifact_spec { type { name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } } upstream_nodes: "CsvExampleGen" downstream_nodes: "SchemaGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 5 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=5, input_dict={'examples': [Artifact(artifact: id: 4 type_id: 15 uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/4" properties { key: "split_names" value { string_value: "[\"train\", \"eval\"]" } } custom_properties { key: "file_format" value { string_value: "tfrecords_gzip" } } custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:09:37.055994:CsvExampleGen:examples:0" } } custom_properties { key: "payload_format" value { string_value: "FORMAT_TF_EXAMPLE" } } custom_properties { key: "span" value { int_value: 0 } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } state: LIVE create_time_since_epoch: 1643886578210 last_update_time_since_epoch: 1643886578210 , artifact_type: id: 15 name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}, output_dict=defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/5" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:09:37.055994:StatisticsGen:statistics:0" } } , artifact_type: name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}), exec_properties={'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/5/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/stateful_working_dir/2022-02-03T11:09:37.055994', tmp_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/5/.temp/', pipeline_node=node_info { type { name: "tfx.components.statistics_gen.component.StatisticsGen" base_type: PROCESS } id: "StatisticsGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:37.055994" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } } inputs { inputs { key: "examples" value { channels { producer_node_query { id: "CsvExampleGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:37.055994" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } artifact_query { type { name: "Examples" base_type: DATASET } } output_key: "examples" } min_count: 1 } } } outputs { outputs { key: "statistics" value { artifact_spec { type { name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } } upstream_nodes: "CsvExampleGen" downstream_nodes: "SchemaGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2022-02-03T11:09:37.055994') INFO:absl:Generating statistics for split train. INFO:absl:Statistics for split train written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/5/Split-train. INFO:absl:Generating statistics for split eval. INFO:absl:Statistics for split eval written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/5/Split-eval. WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter. INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 5 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/5" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:09:37.055994:StatisticsGen:statistics:0" } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } , artifact_type: name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}) for execution 5 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component StatisticsGen is finished. INFO:absl:Component SchemaGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.schema_gen.component.SchemaGen" base_type: PROCESS } id: "SchemaGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:37.055994" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.SchemaGen" } } } } inputs { inputs { key: "statistics" value { channels { producer_node_query { id: "StatisticsGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:37.055994" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } artifact_query { type { name: "ExampleStatistics" base_type: STATISTICS } } output_key: "statistics" } min_count: 1 } } } outputs { outputs { key: "schema" value { artifact_spec { type { name: "Schema" } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } parameters { key: "infer_feature_shape" value { field_value { int_value: 1 } } } } upstream_nodes: "StatisticsGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 6 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=6, input_dict={'statistics': [Artifact(artifact: id: 5 type_id: 17 uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/5" properties { key: "split_names" value { string_value: "[\"train\", \"eval\"]" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:09:37.055994:StatisticsGen:statistics:0" } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } state: LIVE create_time_since_epoch: 1643886581527 last_update_time_since_epoch: 1643886581527 , artifact_type: id: 17 name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}, output_dict=defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/6" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:09:37.055994:SchemaGen:schema:0" } } , artifact_type: name: "Schema" )]}), exec_properties={'exclude_splits': '[]', 'infer_feature_shape': 1}, execution_output_uri='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/6/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/stateful_working_dir/2022-02-03T11:09:37.055994', tmp_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/6/.temp/', pipeline_node=node_info { type { name: "tfx.components.schema_gen.component.SchemaGen" base_type: PROCESS } id: "SchemaGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:37.055994" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.SchemaGen" } } } } inputs { inputs { key: "statistics" value { channels { producer_node_query { id: "StatisticsGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:09:37.055994" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } artifact_query { type { name: "ExampleStatistics" base_type: STATISTICS } } output_key: "statistics" } min_count: 1 } } } outputs { outputs { key: "schema" value { artifact_spec { type { name: "Schema" } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } parameters { key: "infer_feature_shape" value { field_value { int_value: 1 } } } } upstream_nodes: "StatisticsGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2022-02-03T11:09:37.055994') INFO:absl:Processing schema from statistics for split train. INFO:absl:Processing schema from statistics for split eval. INFO:absl:Schema written to ./tfx_pipeline_output/my_pipeline/SchemaGen/schema/6/schema.pbtxt. INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 6 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/6" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:09:37.055994:SchemaGen:schema:0" } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } , artifact_type: name: "Schema" )]}) for execution 6 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component SchemaGen is finished.
Si la canalización se ejecutó correctamente, debería ver "La transformación de componentes ha finalizado". en algún lugar del registro. Dado que el componente Transform
y el componente ExampleValidator
no dependen entre sí, el orden de las ejecuciones no es fijo. Dicho esto, tanto Transform
como ExampleValidator
pueden ser el último componente en la ejecución de la canalización.
Examinar la salida de Transform
El componente Transform crea dos tipos de resultados, un gráfico de Tensorflow y ejemplos transformados. Los ejemplos transformados son del tipo de artefacto Ejemplos que también produce ExampleGen, pero este contiene valores de características transformados en su lugar.
Puede examinarlos como lo hicimos en el paso anterior.
with metadata.Metadata(metadata_connection_config) as metadata_handler:
# Search all aritfacts from the previous run of Transform component.
artifacts = get_latest_artifacts(metadata_handler.store,
PIPELINE_NAME, "Transform")
# Find artifacts of Examples type.
transformed_examples_artifacts = find_latest_artifacts_by_type(
metadata_handler.store, artifacts,
standard_artifacts.Examples.TYPE_NAME)
preview_examples(transformed_examples_artifacts)
Paso 4. Entrena tu modelo con el componente Trainer.
Construiremos un modelo ML utilizando el componente Trainer
. Consulte la guía del componente Trainer para obtener más información. Debe proporcionar el código de su modelo al componente Trainer.
Defina su modelo.
En la plantilla de pingüino, models.model.run_fn
se usa como argumento run_fn
para el componente Trainer
. Significa que se llamará a la función run_fn()
en models/model.py
cuando se ejecute el componente Trainer
. Puede ver el código para construir un modelo DNN simple usando la API de keras
en el código dado. Consulte TensorFlow 2.x en la guía TFX para obtener más información sobre el uso de la API de keras en TFX.
En este run_fn
, debe crear un modelo y guardarlo en un directorio al que apunta fn_args.serving_model_dir
que especifica el componente. Puede usar otros argumentos en fn_args
que se pasan a run_fn
. Consulte los códigos relacionados para ver la lista completa de argumentos en fn_args
.
Defina sus funciones en models/features.py
y utilícelas según sea necesario. Si ha transformado sus funciones en el Paso 3, debe usar las funciones transformadas como entradas para su modelo.
Agregue el componente Entrenador a la canalización.
Si su run_fn está listo, agregue el componente Trainer
a la canalización.
- En el archivo
pipeline/pipeline.py
, descomente# components.append(trainer)
para agregar el componente a la tubería.
Los argumentos para el componente de entrenador pueden depender de si usa el componente Transform o no.
- Si NO usa el componente
Transform
, no necesita cambiar los argumentos. Si usa el componente
Transform
, debe cambiar los argumentos al crear una instancia del componenteTrainer
.- Cambie el argumento de
examples
aexamples=transform.outputs['transformed_examples'],
. Necesitamos usar ejemplos transformados para el entrenamiento. - Agregue el argumento
transform_graph
comotransform_graph=transform.outputs['transform_graph'],
. Este gráfico contiene el gráfico de TensorFlow para las operaciones de transformación. - Después de los cambios anteriores, el código para la creación del componente Trainer tendrá el siguiente aspecto.
# If you use a Transform component. trainer = Trainer( run_fn=run_fn, examples=transform.outputs['transformed_examples'], transform_graph=transform.outputs['transform_graph'], schema=schema_gen.outputs['schema'], ...
- Cambie el argumento de
Puede actualizar la canalización y volver a ejecutarla.
!tfx pipeline update --engine=local --pipeline_path=local_runner.py \
&& tfx run create --engine=local --pipeline_name={PIPELINE_NAME}
CLI Updating pipeline INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Excluding no splits because exclude_splits is not set. Pipeline "my_pipeline" updated successfully. CLI Creating a run for pipeline: my_pipeline INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Using deployment config: executor_specs { key: "CsvExampleGen" value { beam_executable_spec { python_executor_spec { class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor" } } } } executor_specs { key: "SchemaGen" value { python_class_executable_spec { class_path: "tfx.components.schema_gen.executor.Executor" } } } executor_specs { key: "StatisticsGen" value { beam_executable_spec { python_executor_spec { class_path: "tfx.components.statistics_gen.executor.Executor" } } } } custom_driver_specs { key: "CsvExampleGen" value { python_class_executable_spec { class_path: "tfx.components.example_gen.driver.FileBasedDriver" } } } metadata_connection_config { database_connection_config { sqlite { filename_uri: "./tfx_metadata/my_pipeline/metadata.db" connection_mode: READWRITE_OPENCREATE } } } INFO:absl:Using connection config: sqlite { filename_uri: "./tfx_metadata/my_pipeline/metadata.db" connection_mode: READWRITE_OPENCREATE } INFO:absl:Component CsvExampleGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen" } id: "CsvExampleGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:00.469382" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } } outputs { outputs { key: "examples" value { artifact_spec { type { name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET } } } } } parameters { parameters { key: "input_base" value { field_value { string_value: "/home/kbuilder/imported/my_pipeline/data" } } } parameters { key: "input_config" value { field_value { string_value: "{\n \"splits\": [\n {\n \"name\": \"single_split\",\n \"pattern\": \"*\"\n }\n ]\n}" } } } parameters { key: "output_config" value { field_value { string_value: "{\n \"split_config\": {\n \"splits\": [\n {\n \"hash_buckets\": 2,\n \"name\": \"train\"\n },\n {\n \"hash_buckets\": 1,\n \"name\": \"eval\"\n }\n ]\n }\n}" } } } parameters { key: "output_data_format" value { field_value { int_value: 6 } } } parameters { key: "output_file_format" value { field_value { int_value: 5 } } } } downstream_nodes: "StatisticsGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized INFO:absl:select span and version = (0, None) INFO:absl:latest span and version = (0, None) INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 7 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=7, input_dict={}, output_dict=defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/7" custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:00.469382:CsvExampleGen:examples:0" } } custom_properties { key: "span" value { int_value: 0 } } , artifact_type: name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}), exec_properties={'input_base': '/home/kbuilder/imported/my_pipeline/data', 'output_config': '{\n "split_config": {\n "splits": [\n {\n "hash_buckets": 2,\n "name": "train"\n },\n {\n "hash_buckets": 1,\n "name": "eval"\n }\n ]\n }\n}', 'output_data_format': 6, 'output_file_format': 5, 'input_config': '{\n "splits": [\n {\n "name": "single_split",\n "pattern": "*"\n }\n ]\n}', 'span': 0, 'version': None, 'input_fingerprint': 'split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/7/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/stateful_working_dir/2022-02-03T11:10:00.469382', tmp_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/7/.temp/', pipeline_node=node_info { type { name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen" } id: "CsvExampleGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:00.469382" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } } outputs { outputs { key: "examples" value { artifact_spec { type { name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET } } } } } parameters { parameters { key: "input_base" value { field_value { string_value: "/home/kbuilder/imported/my_pipeline/data" } } } parameters { key: "input_config" value { field_value { string_value: "{\n \"splits\": [\n {\n \"name\": \"single_split\",\n \"pattern\": \"*\"\n }\n ]\n}" } } } parameters { key: "output_config" value { field_value { string_value: "{\n \"split_config\": {\n \"splits\": [\n {\n \"hash_buckets\": 2,\n \"name\": \"train\"\n },\n {\n \"hash_buckets\": 1,\n \"name\": \"eval\"\n }\n ]\n }\n}" } } } parameters { key: "output_data_format" value { field_value { int_value: 6 } } } parameters { key: "output_file_format" value { field_value { int_value: 5 } } } } downstream_nodes: "StatisticsGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2022-02-03T11:10:00.469382') INFO:absl:Generating examples. INFO:absl:Processing input csv data /home/kbuilder/imported/my_pipeline/data/* to TFExample. WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter. E0203 11:10:01.173700221 5436 fork_posix.cc:70] Fork support is only compatible with the epoll1 and poll polling strategies WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be. INFO:absl:Examples generated. INFO:absl:Value type <class 'NoneType'> of key version in exec_properties is not supported, going to drop it INFO:absl:Value type <class 'list'> of key _beam_pipeline_args in exec_properties is not supported, going to drop it INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 7 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/7" custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:00.469382:CsvExampleGen:examples:0" } } custom_properties { key: "span" value { int_value: 0 } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } , artifact_type: name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}) for execution 7 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component CsvExampleGen is finished. INFO:absl:Component StatisticsGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.statistics_gen.component.StatisticsGen" base_type: PROCESS } id: "StatisticsGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:00.469382" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } } inputs { inputs { key: "examples" value { channels { producer_node_query { id: "CsvExampleGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:00.469382" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } artifact_query { type { name: "Examples" base_type: DATASET } } output_key: "examples" } min_count: 1 } } } outputs { outputs { key: "statistics" value { artifact_spec { type { name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } } upstream_nodes: "CsvExampleGen" downstream_nodes: "SchemaGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 8 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=8, input_dict={'examples': [Artifact(artifact: id: 7 type_id: 15 uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/7" properties { key: "split_names" value { string_value: "[\"train\", \"eval\"]" } } custom_properties { key: "file_format" value { string_value: "tfrecords_gzip" } } custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:00.469382:CsvExampleGen:examples:0" } } custom_properties { key: "payload_format" value { string_value: "FORMAT_TF_EXAMPLE" } } custom_properties { key: "span" value { int_value: 0 } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } state: LIVE create_time_since_epoch: 1643886601629 last_update_time_since_epoch: 1643886601629 , artifact_type: id: 15 name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}, output_dict=defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/8" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:00.469382:StatisticsGen:statistics:0" } } , artifact_type: name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}), exec_properties={'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/8/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/stateful_working_dir/2022-02-03T11:10:00.469382', tmp_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/8/.temp/', pipeline_node=node_info { type { name: "tfx.components.statistics_gen.component.StatisticsGen" base_type: PROCESS } id: "StatisticsGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:00.469382" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } } inputs { inputs { key: "examples" value { channels { producer_node_query { id: "CsvExampleGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:00.469382" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } artifact_query { type { name: "Examples" base_type: DATASET } } output_key: "examples" } min_count: 1 } } } outputs { outputs { key: "statistics" value { artifact_spec { type { name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } } upstream_nodes: "CsvExampleGen" downstream_nodes: "SchemaGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2022-02-03T11:10:00.469382') INFO:absl:Generating statistics for split train. INFO:absl:Statistics for split train written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/8/Split-train. INFO:absl:Generating statistics for split eval. INFO:absl:Statistics for split eval written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/8/Split-eval. WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter. INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 8 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/8" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:00.469382:StatisticsGen:statistics:0" } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } , artifact_type: name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}) for execution 8 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component StatisticsGen is finished. INFO:absl:Component SchemaGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.schema_gen.component.SchemaGen" base_type: PROCESS } id: "SchemaGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:00.469382" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.SchemaGen" } } } } inputs { inputs { key: "statistics" value { channels { producer_node_query { id: "StatisticsGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:00.469382" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } artifact_query { type { name: "ExampleStatistics" base_type: STATISTICS } } output_key: "statistics" } min_count: 1 } } } outputs { outputs { key: "schema" value { artifact_spec { type { name: "Schema" } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } parameters { key: "infer_feature_shape" value { field_value { int_value: 1 } } } } upstream_nodes: "StatisticsGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 9 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=9, input_dict={'statistics': [Artifact(artifact: id: 8 type_id: 17 uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/8" properties { key: "split_names" value { string_value: "[\"train\", \"eval\"]" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:00.469382:StatisticsGen:statistics:0" } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } state: LIVE create_time_since_epoch: 1643886605023 last_update_time_since_epoch: 1643886605023 , artifact_type: id: 17 name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}, output_dict=defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/9" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:00.469382:SchemaGen:schema:0" } } , artifact_type: name: "Schema" )]}), exec_properties={'exclude_splits': '[]', 'infer_feature_shape': 1}, execution_output_uri='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/9/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/stateful_working_dir/2022-02-03T11:10:00.469382', tmp_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/9/.temp/', pipeline_node=node_info { type { name: "tfx.components.schema_gen.component.SchemaGen" base_type: PROCESS } id: "SchemaGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:00.469382" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.SchemaGen" } } } } inputs { inputs { key: "statistics" value { channels { producer_node_query { id: "StatisticsGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:00.469382" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } artifact_query { type { name: "ExampleStatistics" base_type: STATISTICS } } output_key: "statistics" } min_count: 1 } } } outputs { outputs { key: "schema" value { artifact_spec { type { name: "Schema" } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } parameters { key: "infer_feature_shape" value { field_value { int_value: 1 } } } } upstream_nodes: "StatisticsGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2022-02-03T11:10:00.469382') INFO:absl:Processing schema from statistics for split train. INFO:absl:Processing schema from statistics for split eval. INFO:absl:Schema written to ./tfx_pipeline_output/my_pipeline/SchemaGen/schema/9/schema.pbtxt. INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 9 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/9" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:00.469382:SchemaGen:schema:0" } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } , artifact_type: name: "Schema" )]}) for execution 9 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component SchemaGen is finished.
Cuando esta ejecución se ejecute correctamente, habrá creado y ejecutado su primera canalización TFX para su modelo. ¡Felicidades!
Su nuevo modelo se ubicará en algún lugar bajo el directorio de salida, pero sería mejor tener un modelo en una ubicación fija o servicio fuera de la canalización de TFX que contiene muchos resultados provisionales. Aún mejor con la evaluación continua del modelo construido, que es fundamental en los sistemas de producción de ML. Veremos cómo funcionan la evaluación continua y las implementaciones en TFX en el siguiente paso.
Paso 5. (Opcional) Evalúe el modelo con Evaluator y publíquelo con Pusher.
El componente Evaluator
evalúa continuamente cada modelo construido desde Trainer
, y Pusher
copia el modelo en una ubicación predefinida en el sistema de archivos o incluso en Google Cloud AI Platform Models .
Agrega el componente Evaluator a la canalización.
En el archivo pipeline/pipeline.py
:
- Quite el comentario
# components.append(model_resolver)
para agregar el último modelo de resolución a la canalización. Evaluator se puede usar para comparar un modelo con un modelo de línea de base antiguo que pasó Evaluator en la última ejecución de canalización.LatestBlessedModelResolver
encuentra el último modelo que pasó Evaluator. - Establezca
tfma.MetricsSpec
adecuado para su modelo. La evaluación puede ser diferente para cada modelo de ML. En la plantilla de pingüino, se utilizóSparseCategoricalAccuracy
porque estamos resolviendo un problema de clasificación de varias categorías. También debe especificartfma.SliceSpec
para analizar su modelo en porciones específicas. Para obtener más detalles, consulte la guía del componente Evaluator . - Quite el comentario
# components.append(evaluator)
para agregar el componente a la canalización.
Puede actualizar la canalización y volver a ejecutarla.
# Update and run the pipeline.
!tfx pipeline update --engine=local --pipeline_path=local_runner.py \
&& tfx run create --engine=local --pipeline_name={PIPELINE_NAME}
CLI Updating pipeline INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Excluding no splits because exclude_splits is not set. Pipeline "my_pipeline" updated successfully. CLI Creating a run for pipeline: my_pipeline INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Using deployment config: executor_specs { key: "CsvExampleGen" value { beam_executable_spec { python_executor_spec { class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor" } } } } executor_specs { key: "SchemaGen" value { python_class_executable_spec { class_path: "tfx.components.schema_gen.executor.Executor" } } } executor_specs { key: "StatisticsGen" value { beam_executable_spec { python_executor_spec { class_path: "tfx.components.statistics_gen.executor.Executor" } } } } custom_driver_specs { key: "CsvExampleGen" value { python_class_executable_spec { class_path: "tfx.components.example_gen.driver.FileBasedDriver" } } } metadata_connection_config { database_connection_config { sqlite { filename_uri: "./tfx_metadata/my_pipeline/metadata.db" connection_mode: READWRITE_OPENCREATE } } } INFO:absl:Using connection config: sqlite { filename_uri: "./tfx_metadata/my_pipeline/metadata.db" connection_mode: READWRITE_OPENCREATE } INFO:absl:Component CsvExampleGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen" } id: "CsvExampleGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:24.358660" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } } outputs { outputs { key: "examples" value { artifact_spec { type { name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET } } } } } parameters { parameters { key: "input_base" value { field_value { string_value: "/home/kbuilder/imported/my_pipeline/data" } } } parameters { key: "input_config" value { field_value { string_value: "{\n \"splits\": [\n {\n \"name\": \"single_split\",\n \"pattern\": \"*\"\n }\n ]\n}" } } } parameters { key: "output_config" value { field_value { string_value: "{\n \"split_config\": {\n \"splits\": [\n {\n \"hash_buckets\": 2,\n \"name\": \"train\"\n },\n {\n \"hash_buckets\": 1,\n \"name\": \"eval\"\n }\n ]\n }\n}" } } } parameters { key: "output_data_format" value { field_value { int_value: 6 } } } parameters { key: "output_file_format" value { field_value { int_value: 5 } } } } downstream_nodes: "StatisticsGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized INFO:absl:select span and version = (0, None) INFO:absl:latest span and version = (0, None) INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 10 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=10, input_dict={}, output_dict=defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/10" custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:24.358660:CsvExampleGen:examples:0" } } custom_properties { key: "span" value { int_value: 0 } } , artifact_type: name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}), exec_properties={'output_file_format': 5, 'output_config': '{\n "split_config": {\n "splits": [\n {\n "hash_buckets": 2,\n "name": "train"\n },\n {\n "hash_buckets": 1,\n "name": "eval"\n }\n ]\n }\n}', 'output_data_format': 6, 'input_base': '/home/kbuilder/imported/my_pipeline/data', 'input_config': '{\n "splits": [\n {\n "name": "single_split",\n "pattern": "*"\n }\n ]\n}', 'span': 0, 'version': None, 'input_fingerprint': 'split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/10/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/stateful_working_dir/2022-02-03T11:10:24.358660', tmp_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/10/.temp/', pipeline_node=node_info { type { name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen" } id: "CsvExampleGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:24.358660" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } } outputs { outputs { key: "examples" value { artifact_spec { type { name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET } } } } } parameters { parameters { key: "input_base" value { field_value { string_value: "/home/kbuilder/imported/my_pipeline/data" } } } parameters { key: "input_config" value { field_value { string_value: "{\n \"splits\": [\n {\n \"name\": \"single_split\",\n \"pattern\": \"*\"\n }\n ]\n}" } } } parameters { key: "output_config" value { field_value { string_value: "{\n \"split_config\": {\n \"splits\": [\n {\n \"hash_buckets\": 2,\n \"name\": \"train\"\n },\n {\n \"hash_buckets\": 1,\n \"name\": \"eval\"\n }\n ]\n }\n}" } } } parameters { key: "output_data_format" value { field_value { int_value: 6 } } } parameters { key: "output_file_format" value { field_value { int_value: 5 } } } } downstream_nodes: "StatisticsGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2022-02-03T11:10:24.358660') INFO:absl:Generating examples. INFO:absl:Processing input csv data /home/kbuilder/imported/my_pipeline/data/* to TFExample. WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter. E0203 11:10:24.894390124 5584 fork_posix.cc:70] Fork support is only compatible with the epoll1 and poll polling strategies WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be. INFO:absl:Examples generated. INFO:absl:Value type <class 'NoneType'> of key version in exec_properties is not supported, going to drop it INFO:absl:Value type <class 'list'> of key _beam_pipeline_args in exec_properties is not supported, going to drop it INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 10 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/10" custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:24.358660:CsvExampleGen:examples:0" } } custom_properties { key: "span" value { int_value: 0 } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } , artifact_type: name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}) for execution 10 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component CsvExampleGen is finished. INFO:absl:Component StatisticsGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.statistics_gen.component.StatisticsGen" base_type: PROCESS } id: "StatisticsGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:24.358660" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } } inputs { inputs { key: "examples" value { channels { producer_node_query { id: "CsvExampleGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:24.358660" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } artifact_query { type { name: "Examples" base_type: DATASET } } output_key: "examples" } min_count: 1 } } } outputs { outputs { key: "statistics" value { artifact_spec { type { name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } } upstream_nodes: "CsvExampleGen" downstream_nodes: "SchemaGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 11 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=11, input_dict={'examples': [Artifact(artifact: id: 10 type_id: 15 uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/10" properties { key: "split_names" value { string_value: "[\"train\", \"eval\"]" } } custom_properties { key: "file_format" value { string_value: "tfrecords_gzip" } } custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:24.358660:CsvExampleGen:examples:0" } } custom_properties { key: "payload_format" value { string_value: "FORMAT_TF_EXAMPLE" } } custom_properties { key: "span" value { int_value: 0 } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } state: LIVE create_time_since_epoch: 1643886625515 last_update_time_since_epoch: 1643886625515 , artifact_type: id: 15 name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}, output_dict=defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/11" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:24.358660:StatisticsGen:statistics:0" } } , artifact_type: name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}), exec_properties={'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/11/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/stateful_working_dir/2022-02-03T11:10:24.358660', tmp_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/11/.temp/', pipeline_node=node_info { type { name: "tfx.components.statistics_gen.component.StatisticsGen" base_type: PROCESS } id: "StatisticsGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:24.358660" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } } inputs { inputs { key: "examples" value { channels { producer_node_query { id: "CsvExampleGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:24.358660" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } artifact_query { type { name: "Examples" base_type: DATASET } } output_key: "examples" } min_count: 1 } } } outputs { outputs { key: "statistics" value { artifact_spec { type { name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } } upstream_nodes: "CsvExampleGen" downstream_nodes: "SchemaGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2022-02-03T11:10:24.358660') INFO:absl:Generating statistics for split train. INFO:absl:Statistics for split train written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/11/Split-train. INFO:absl:Generating statistics for split eval. INFO:absl:Statistics for split eval written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/11/Split-eval. WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter. INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 11 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/11" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:24.358660:StatisticsGen:statistics:0" } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } , artifact_type: name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}) for execution 11 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component StatisticsGen is finished. INFO:absl:Component SchemaGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.schema_gen.component.SchemaGen" base_type: PROCESS } id: "SchemaGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:24.358660" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.SchemaGen" } } } } inputs { inputs { key: "statistics" value { channels { producer_node_query { id: "StatisticsGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:24.358660" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } artifact_query { type { name: "ExampleStatistics" base_type: STATISTICS } } output_key: "statistics" } min_count: 1 } } } outputs { outputs { key: "schema" value { artifact_spec { type { name: "Schema" } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } parameters { key: "infer_feature_shape" value { field_value { int_value: 1 } } } } upstream_nodes: "StatisticsGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 12 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=12, input_dict={'statistics': [Artifact(artifact: id: 11 type_id: 17 uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/11" properties { key: "split_names" value { string_value: "[\"train\", \"eval\"]" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:24.358660:StatisticsGen:statistics:0" } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } state: LIVE create_time_since_epoch: 1643886628941 last_update_time_since_epoch: 1643886628941 , artifact_type: id: 17 name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}, output_dict=defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/12" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:24.358660:SchemaGen:schema:0" } } , artifact_type: name: "Schema" )]}), exec_properties={'infer_feature_shape': 1, 'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/12/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/stateful_working_dir/2022-02-03T11:10:24.358660', tmp_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/12/.temp/', pipeline_node=node_info { type { name: "tfx.components.schema_gen.component.SchemaGen" base_type: PROCESS } id: "SchemaGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:24.358660" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.SchemaGen" } } } } inputs { inputs { key: "statistics" value { channels { producer_node_query { id: "StatisticsGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:24.358660" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } artifact_query { type { name: "ExampleStatistics" base_type: STATISTICS } } output_key: "statistics" } min_count: 1 } } } outputs { outputs { key: "schema" value { artifact_spec { type { name: "Schema" } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } parameters { key: "infer_feature_shape" value { field_value { int_value: 1 } } } } upstream_nodes: "StatisticsGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2022-02-03T11:10:24.358660') INFO:absl:Processing schema from statistics for split train. INFO:absl:Processing schema from statistics for split eval. INFO:absl:Schema written to ./tfx_pipeline_output/my_pipeline/SchemaGen/schema/12/schema.pbtxt. INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 12 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/12" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:24.358660:SchemaGen:schema:0" } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } , artifact_type: name: "Schema" )]}) for execution 12 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component SchemaGen is finished.
Examinar la salida del Evaluador
Este paso requiere la extensión del cuaderno Jupyter Notebook de TensorFlow Model Analysis (TFMA). Tenga en cuenta que la versión de la extensión del portátil TFMA debe ser idéntica a la versión del paquete python de TFMA.
El siguiente comando instalará la extensión del portátil TFMA desde el registro de NPM. Puede tardar varios minutos en completarse.
# Install TFMA notebook extension.
jupyter labextension install tensorflow_model_analysis@{tfma.__version__}
usage: jupyter [-h] [--version] [--config-dir] [--data-dir] [--runtime-dir] [--paths] [--json] [--debug] [subcommand] Jupyter: Interactive Computing positional arguments: subcommand the subcommand to launch optional arguments: -h, --help show this help message and exit --version show the versions of core jupyter packages and exit --config-dir show Jupyter config dir --data-dir show Jupyter data dir --runtime-dir show Jupyter runtime dir --paths show all Jupyter paths. Add --json for machine-readable format. --json output paths as machine-readable json --debug output debug information about paths Available subcommands: bundlerextension console dejavu execute kernel kernelspec migrate nbconvert nbextension notebook qtconsole run serverextension troubleshoot trust Jupyter command `jupyter-labextension` not found.
Si se completa la instalación, vuelva a cargar su navegador para que la extensión surta efecto.
with metadata.Metadata(metadata_connection_config) as metadata_handler:
# Search all aritfacts from the previous pipeline run.
artifacts = get_latest_artifacts(metadata_handler.store, PIPELINE_NAME)
model_evaluation_artifacts = find_latest_artifacts_by_type(
metadata_handler.store, artifacts,
standard_artifacts.ModelEvaluation.TYPE_NAME)
if model_evaluation_artifacts:
tfma_result = tfma.load_eval_result(model_evaluation_artifacts[0].uri)
tfma.view.render_slicing_metrics(tfma_result)
Agrega el componente Pusher a la canalización.
Si el modelo parece prometedor, necesitamos publicar el modelo. El componente Pusher puede publicar el modelo en una ubicación en el sistema de archivos o en los modelos de GCP AI Platform mediante un ejecutor personalizado .
El componente Evaluator
evalúa continuamente cada modelo construido desde Trainer
, y Pusher
copia el modelo en una ubicación predefinida en el sistema de archivos o incluso en Google Cloud AI Platform Models .
- En
local_runner.py
, establezcaSERVING_MODEL_DIR
en un directorio para publicar. - En el archivo
pipeline/pipeline.py
, descomente# components.append(pusher)
para agregar Pusher a la tubería.
Puede actualizar la canalización y volver a ejecutarla.
# Update and run the pipeline.
!tfx pipeline update --engine=local --pipeline_path=local_runner.py \
&& tfx run create --engine=local --pipeline_name={PIPELINE_NAME}
CLI Updating pipeline INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Excluding no splits because exclude_splits is not set. Pipeline "my_pipeline" updated successfully. CLI Creating a run for pipeline: my_pipeline INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Excluding no splits because exclude_splits is not set. INFO:absl:Using deployment config: executor_specs { key: "CsvExampleGen" value { beam_executable_spec { python_executor_spec { class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor" } } } } executor_specs { key: "SchemaGen" value { python_class_executable_spec { class_path: "tfx.components.schema_gen.executor.Executor" } } } executor_specs { key: "StatisticsGen" value { beam_executable_spec { python_executor_spec { class_path: "tfx.components.statistics_gen.executor.Executor" } } } } custom_driver_specs { key: "CsvExampleGen" value { python_class_executable_spec { class_path: "tfx.components.example_gen.driver.FileBasedDriver" } } } metadata_connection_config { database_connection_config { sqlite { filename_uri: "./tfx_metadata/my_pipeline/metadata.db" connection_mode: READWRITE_OPENCREATE } } } INFO:absl:Using connection config: sqlite { filename_uri: "./tfx_metadata/my_pipeline/metadata.db" connection_mode: READWRITE_OPENCREATE } INFO:absl:Component CsvExampleGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen" } id: "CsvExampleGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:48.556314" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } } outputs { outputs { key: "examples" value { artifact_spec { type { name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET } } } } } parameters { parameters { key: "input_base" value { field_value { string_value: "/home/kbuilder/imported/my_pipeline/data" } } } parameters { key: "input_config" value { field_value { string_value: "{\n \"splits\": [\n {\n \"name\": \"single_split\",\n \"pattern\": \"*\"\n }\n ]\n}" } } } parameters { key: "output_config" value { field_value { string_value: "{\n \"split_config\": {\n \"splits\": [\n {\n \"hash_buckets\": 2,\n \"name\": \"train\"\n },\n {\n \"hash_buckets\": 1,\n \"name\": \"eval\"\n }\n ]\n }\n}" } } } parameters { key: "output_data_format" value { field_value { int_value: 6 } } } parameters { key: "output_file_format" value { field_value { int_value: 5 } } } } downstream_nodes: "StatisticsGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized INFO:absl:select span and version = (0, None) INFO:absl:latest span and version = (0, None) INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 13 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=13, input_dict={}, output_dict=defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/13" custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:48.556314:CsvExampleGen:examples:0" } } custom_properties { key: "span" value { int_value: 0 } } , artifact_type: name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}), exec_properties={'input_config': '{\n "splits": [\n {\n "name": "single_split",\n "pattern": "*"\n }\n ]\n}', 'output_config': '{\n "split_config": {\n "splits": [\n {\n "hash_buckets": 2,\n "name": "train"\n },\n {\n "hash_buckets": 1,\n "name": "eval"\n }\n ]\n }\n}', 'output_file_format': 5, 'output_data_format': 6, 'input_base': '/home/kbuilder/imported/my_pipeline/data', 'span': 0, 'version': None, 'input_fingerprint': 'split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/13/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/stateful_working_dir/2022-02-03T11:10:48.556314', tmp_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/13/.temp/', pipeline_node=node_info { type { name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen" } id: "CsvExampleGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:48.556314" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } } outputs { outputs { key: "examples" value { artifact_spec { type { name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET } } } } } parameters { parameters { key: "input_base" value { field_value { string_value: "/home/kbuilder/imported/my_pipeline/data" } } } parameters { key: "input_config" value { field_value { string_value: "{\n \"splits\": [\n {\n \"name\": \"single_split\",\n \"pattern\": \"*\"\n }\n ]\n}" } } } parameters { key: "output_config" value { field_value { string_value: "{\n \"split_config\": {\n \"splits\": [\n {\n \"hash_buckets\": 2,\n \"name\": \"train\"\n },\n {\n \"hash_buckets\": 1,\n \"name\": \"eval\"\n }\n ]\n }\n}" } } } parameters { key: "output_data_format" value { field_value { int_value: 6 } } } parameters { key: "output_file_format" value { field_value { int_value: 5 } } } } downstream_nodes: "StatisticsGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2022-02-03T11:10:48.556314') INFO:absl:Generating examples. INFO:absl:Processing input csv data /home/kbuilder/imported/my_pipeline/data/* to TFExample. WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter. E0203 11:10:49.163841363 5734 fork_posix.cc:70] Fork support is only compatible with the epoll1 and poll polling strategies WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be. INFO:absl:Examples generated. INFO:absl:Value type <class 'NoneType'> of key version in exec_properties is not supported, going to drop it INFO:absl:Value type <class 'list'> of key _beam_pipeline_args in exec_properties is not supported, going to drop it INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 13 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/13" custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:48.556314:CsvExampleGen:examples:0" } } custom_properties { key: "span" value { int_value: 0 } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } , artifact_type: name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}) for execution 13 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component CsvExampleGen is finished. INFO:absl:Component StatisticsGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.statistics_gen.component.StatisticsGen" base_type: PROCESS } id: "StatisticsGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:48.556314" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } } inputs { inputs { key: "examples" value { channels { producer_node_query { id: "CsvExampleGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:48.556314" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } artifact_query { type { name: "Examples" base_type: DATASET } } output_key: "examples" } min_count: 1 } } } outputs { outputs { key: "statistics" value { artifact_spec { type { name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } } upstream_nodes: "CsvExampleGen" downstream_nodes: "SchemaGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 14 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=14, input_dict={'examples': [Artifact(artifact: id: 13 type_id: 15 uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/13" properties { key: "split_names" value { string_value: "[\"train\", \"eval\"]" } } custom_properties { key: "file_format" value { string_value: "tfrecords_gzip" } } custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1643886522,sum_checksum:1643886522" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:48.556314:CsvExampleGen:examples:0" } } custom_properties { key: "payload_format" value { string_value: "FORMAT_TF_EXAMPLE" } } custom_properties { key: "span" value { int_value: 0 } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } state: LIVE create_time_since_epoch: 1643886649739 last_update_time_since_epoch: 1643886649739 , artifact_type: id: 15 name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}, output_dict=defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/14" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:48.556314:StatisticsGen:statistics:0" } } , artifact_type: name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}), exec_properties={'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/14/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/stateful_working_dir/2022-02-03T11:10:48.556314', tmp_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/14/.temp/', pipeline_node=node_info { type { name: "tfx.components.statistics_gen.component.StatisticsGen" base_type: PROCESS } id: "StatisticsGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:48.556314" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } } inputs { inputs { key: "examples" value { channels { producer_node_query { id: "CsvExampleGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:48.556314" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.CsvExampleGen" } } } artifact_query { type { name: "Examples" base_type: DATASET } } output_key: "examples" } min_count: 1 } } } outputs { outputs { key: "statistics" value { artifact_spec { type { name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } } upstream_nodes: "CsvExampleGen" downstream_nodes: "SchemaGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2022-02-03T11:10:48.556314') INFO:absl:Generating statistics for split train. INFO:absl:Statistics for split train written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/14/Split-train. INFO:absl:Generating statistics for split eval. INFO:absl:Statistics for split eval written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/14/Split-eval. WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter. INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 14 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/14" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:48.556314:StatisticsGen:statistics:0" } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } , artifact_type: name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}) for execution 14 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component StatisticsGen is finished. INFO:absl:Component SchemaGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.schema_gen.component.SchemaGen" base_type: PROCESS } id: "SchemaGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:48.556314" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.SchemaGen" } } } } inputs { inputs { key: "statistics" value { channels { producer_node_query { id: "StatisticsGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:48.556314" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } artifact_query { type { name: "ExampleStatistics" base_type: STATISTICS } } output_key: "statistics" } min_count: 1 } } } outputs { outputs { key: "schema" value { artifact_spec { type { name: "Schema" } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } parameters { key: "infer_feature_shape" value { field_value { int_value: 1 } } } } upstream_nodes: "StatisticsGen" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 15 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=15, input_dict={'statistics': [Artifact(artifact: id: 14 type_id: 17 uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/14" properties { key: "split_names" value { string_value: "[\"train\", \"eval\"]" } } custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:48.556314:StatisticsGen:statistics:0" } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } state: LIVE create_time_since_epoch: 1643886653128 last_update_time_since_epoch: 1643886653128 , artifact_type: id: 17 name: "ExampleStatistics" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } base_type: STATISTICS )]}, output_dict=defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/15" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:48.556314:SchemaGen:schema:0" } } , artifact_type: name: "Schema" )]}), exec_properties={'infer_feature_shape': 1, 'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/15/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/stateful_working_dir/2022-02-03T11:10:48.556314', tmp_dir='./tfx_pipeline_output/my_pipeline/SchemaGen/.system/executor_execution/15/.temp/', pipeline_node=node_info { type { name: "tfx.components.schema_gen.component.SchemaGen" base_type: PROCESS } id: "SchemaGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:48.556314" } } } contexts { type { name: "node" } name { field_value { string_value: "my_pipeline.SchemaGen" } } } } inputs { inputs { key: "statistics" value { channels { producer_node_query { id: "StatisticsGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "my_pipeline" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-02-03T11:10:48.556314" } } } context_queries { type { name: "node" } name { field_value { string_value: "my_pipeline.StatisticsGen" } } } artifact_query { type { name: "ExampleStatistics" base_type: STATISTICS } } output_key: "statistics" } min_count: 1 } } } outputs { outputs { key: "schema" value { artifact_spec { type { name: "Schema" } } } } } parameters { parameters { key: "exclude_splits" value { field_value { string_value: "[]" } } } parameters { key: "infer_feature_shape" value { field_value { int_value: 1 } } } } upstream_nodes: "StatisticsGen" execution_options { caching_options { } } , pipeline_info=id: "my_pipeline" , pipeline_run_id='2022-02-03T11:10:48.556314') INFO:absl:Processing schema from statistics for split train. INFO:absl:Processing schema from statistics for split eval. INFO:absl:Schema written to ./tfx_pipeline_output/my_pipeline/SchemaGen/schema/15/schema.pbtxt. INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 15 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'schema': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/SchemaGen/schema/15" custom_properties { key: "name" value { string_value: "my_pipeline:2022-02-03T11:10:48.556314:SchemaGen:schema:0" } } custom_properties { key: "tfx_version" value { string_value: "1.6.0" } } , artifact_type: name: "Schema" )]}) for execution 15 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component SchemaGen is finished.
Debería poder encontrar su nuevo modelo en SERVING_MODEL_DIR
.
Paso 6. (Opcional) Implemente su canalización en Kubeflow Pipelines en GCP.
Como se mencionó anteriormente, local_runner.py
es bueno para la depuración o el desarrollo, pero no es la mejor solución para las cargas de trabajo de producción. En este paso, implementaremos la canalización en Kubeflow Pipelines en Google Cloud.
Preparación
Necesitamos el paquete kfp
python y el programa skaffold
para implementar una canalización en un clúster de Kubeflow Pipelines.
pip install --upgrade -q kfp
# Download skaffold and set it executable.
curl -Lo skaffold https://storage.googleapis.com/skaffold/releases/latest/skaffold-linux-amd64 && chmod +x skaffold
Debe mover el binario de skaffold
al lugar donde su caparazón pueda encontrarlo. O puede especificar la ruta a skaffold cuando ejecuta tfx
binary con --skaffold-cmd
.
# Move skaffold binary into your path
mv skaffold /home/jupyter/.local/bin/
mv: cannot move 'skaffold' to '/home/jupyter/.local/bin/': No such file or directory
También necesita un clúster de Kubeflow Pipelines para ejecutar la canalización. Siga los pasos 1 y 2 del tutorial TFX on Cloud AI Platform Pipelines .
Cuando su clúster esté listo, abra el panel de canalización haciendo clic en Abrir panel de canalización en la página Pipelines
de la consola en la nube de Google . La URL de esta página es ENDPOINT
para solicitar una ejecución de canalización. El valor del punto final es todo lo que se encuentra en la URL después de https://, hasta googleusercontent.com inclusive. Coloque su punto final en el siguiente bloque de código.
ENDPOINT='' # Enter your ENDPOINT here.
Para ejecutar nuestro código en un clúster de Kubeflow Pipelines, debemos empaquetar nuestro código en una imagen de contenedor. La imagen se creará automáticamente durante la implementación de nuestra canalización, y solo necesita establecer un nombre y un registro de contenedor para su imagen. En nuestro ejemplo, usaremos el registro de contenedores de Google y lo tfx-pipeline
.
# Read GCP project id from env.
shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
GOOGLE_CLOUD_PROJECT=shell_output[0]
# Docker image name for the pipeline image.
CUSTOM_TFX_IMAGE='gcr.io/' + GOOGLE_CLOUD_PROJECT + '/tfx-pipeline'
Establecer ubicación de datos.
Se debe poder acceder a sus datos desde el clúster de Kubeflow Pipelines. Si ha utilizado datos en su entorno local, es posible que deba cargarlos en un almacenamiento remoto como Google Cloud Storage. Por ejemplo, podemos cargar datos de pingüinos en un depósito predeterminado que se crea automáticamente cuando se implementa un clúster de Kubeflow Pipelines como se muestra a continuación.
gsutil cp data/data.csv gs://{GOOGLE_CLOUD_PROJECT}-kubeflowpipelines-default/tfx-template/data/penguin/
Copying file://data/data.csv [Content-Type=text/csv]... NotFoundException: 404 The destination bucket gs://tf-benchmark-dashboard-kubeflowpipelines-default does not exist or the write to the destination must be restarted
Actualice la ubicación de los datos almacenados en DATA_PATH
en kubeflow_runner.py
.
Si usa BigQueryExampleGen, no es necesario cargar el archivo de datos, pero asegúrese de que kubeflow_runner.py
use la misma query
y el mismo argumento beam_pipeline_args
para la función pipeline.create_pipeline()
.
Implementar la canalización.
Si todo está listo, puede crear una tubería usando el comando de tfx pipeline create
.
!tfx pipeline create \
--engine=kubeflow \
--pipeline-path=kubeflow_runner.py \
--endpoint={ENDPOINT} \
--build-target-image={CUSTOM_TFX_IMAGE}
CLI [Error] --build-target-image flag was DELETED. You should specify the build target image at the `KubeflowDagRunnerConfig` class instead, and use --build-image flag without argument to build a container image when creating or updating a pipeline.
Ahora inicie una ejecución con la canalización recién creada mediante el comando tfx run create
.
tfx run create --engine=kubeflow --pipeline-name={PIPELINE_NAME} --endpoint={ENDPOINT}
CLI Creating a run for pipeline: my_pipeline Failed to load kube config. Traceback (most recent call last): File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connection.py", line 175, in _new_conn (self._dns_host, self.port), self.timeout, **extra_kw File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/util/connection.py", line 95, in create_connection raise err File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/util/connection.py", line 85, in create_connection sock.connect(sa) ConnectionRefusedError: [Errno 111] Connection refused During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connectionpool.py", line 710, in urlopen chunked=chunked, File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connectionpool.py", line 398, in _make_request conn.request(method, url, **httplib_request_kw) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connection.py", line 239, in request super(HTTPConnection, self).request(method, url, body=body, headers=headers) File "/usr/lib/python3.7/http/client.py", line 1256, in request self._send_request(method, url, body, headers, encode_chunked) File "/usr/lib/python3.7/http/client.py", line 1302, in _send_request self.endheaders(body, encode_chunked=encode_chunked) File "/usr/lib/python3.7/http/client.py", line 1251, in endheaders self._send_output(message_body, encode_chunked=encode_chunked) File "/usr/lib/python3.7/http/client.py", line 1030, in _send_output self.send(msg) File "/usr/lib/python3.7/http/client.py", line 970, in send self.connect() File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connection.py", line 205, in connect conn = self._new_conn() File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connection.py", line 187, in _new_conn self, "Failed to establish a new connection: %s" % e urllib3.exceptions.NewConnectionError: <urllib3.connection.HTTPConnection object at 0x7ff729e34190>: Failed to establish a new connection: [Errno 111] Connection refused During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/tmpfs/src/tf_docs_env/bin/tfx", line 8, in <module> sys.exit(cli_group()) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 829, in __call__ return self.main(*args, **kwargs) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 782, in main rv = self.invoke(ctx) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 1259, in invoke return _process_result(sub_ctx.command.invoke(sub_ctx)) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 1259, in invoke return _process_result(sub_ctx.command.invoke(sub_ctx)) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 1066, in invoke return ctx.invoke(self.callback, **ctx.params) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 610, in invoke return callback(*args, **kwargs) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/decorators.py", line 73, in new_func return ctx.invoke(f, obj, *args, **kwargs) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/click/core.py", line 610, in invoke return callback(*args, **kwargs) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tfx/tools/cli/commands/run.py", line 94, in create_run handler = handler_factory.create_handler(ctx.flags_dict) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tfx/tools/cli/handler/handler_factory.py", line 93, in create_handler return kubeflow_handler.KubeflowHandler(flags_dict) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tfx/tools/cli/handler/kubeflow_handler.py", line 62, in __init__ namespace=self.flags_dict[labels.NAMESPACE]) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp/_client.py", line 197, in __init__ if not self._context_setting['namespace'] and self.get_kfp_healthz( File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp/_client.py", line 411, in get_kfp_healthz response = self._healthz_api.get_healthz() File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp_server_api/api/healthz_service_api.py", line 63, in get_healthz return self.get_healthz_with_http_info(**kwargs) # noqa: E501 File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp_server_api/api/healthz_service_api.py", line 148, in get_healthz_with_http_info collection_formats=collection_formats) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp_server_api/api_client.py", line 369, in call_api _preload_content, _request_timeout, _host) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp_server_api/api_client.py", line 185, in __call_api _request_timeout=_request_timeout) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp_server_api/api_client.py", line 393, in request headers=headers) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp_server_api/rest.py", line 234, in GET query_params=query_params) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/kfp_server_api/rest.py", line 212, in request headers=headers) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/request.py", line 75, in request method, url, fields=fields, headers=headers, **urlopen_kw File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/request.py", line 96, in request_encode_url return self.urlopen(method, url, **extra_kw) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/poolmanager.py", line 375, in urlopen response = conn.urlopen(method, u.request_uri, **kw) File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connectionpool.py", line 826, in urlopen **response_kw File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connectionpool.py", line 826, in urlopen **response_kw File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connectionpool.py", line 826, in urlopen **response_kw File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/connectionpool.py", line 786, in urlopen method, url, error=e, _pool=self, _stacktrace=sys.exc_info()[2] File "/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/urllib3/util/retry.py", line 592, in increment raise MaxRetryError(_pool, url, error or ResponseError(cause)) urllib3.exceptions.MaxRetryError: HTTPConnectionPool(host='localhost', port=80): Max retries exceeded with url: /apis/v1beta1/healthz (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7ff729e34190>: Failed to establish a new connection: [Errno 111] Connection refused'))
O bien, también puede ejecutar la canalización en el panel de Kubeflow Pipelines. La nueva ejecución se incluirá en Experiments
en el panel de Pipelines de Kubeflow. Hacer clic en el experimento le permitirá monitorear el progreso y visualizar los artefactos creados durante la ejecución.
Si está interesado en ejecutar su canalización en Kubeflow Pipelines, encuentre más instrucciones en el tutorial TFX on Cloud AI Platform Pipelines .
Limpiar
Para limpiar todos los recursos de Google Cloud usados en este paso, puede eliminar el proyecto de Google Cloud que usó para el instructivo.
Alternativamente, puede limpiar recursos individuales visitando cada consola: