La journée communautaire ML est le 9 novembre ! Rejoignez - nous pour les mises à jour de tensorflow, JAX et plus En savoir plus

Créez un pipeline TFX pour vos données avec le modèle Penguin


introduction

Ce document fournit des instructions pour créer un tensorflow étendu (TFX) pipeline pour votre propre jeu de données à l' aide pingouin modèle qui est fourni avec le package TFX Python. Pipeline créé utilisera Palmer Penguins ensemble de données au départ, mais nous allons transformer le pipeline pour votre ensemble de données.

Conditions préalables

  • Linux/Mac OS
  • Python 3.6-3.8
  • Cahier Jupyter

Étape 1. Copiez le modèle prédéfini dans votre répertoire de projet.

Dans cette étape, nous allons créer un répertoire de projet de pipeline de travail et les fichiers en copiant les fichiers de modèle de pingouin dans TFX. Vous pouvez considérer cela comme un échafaudage pour votre projet de pipeline TFX.

Installer le package requis

Tout d' abord, installez tfx package python. Nous fixons la version de tfx et tensorflow-model-analysis pour nous assurer que nous pouvons avoir des dépendances de NPM nécessaires pour la visualisation.

# Install tfx Python package.
import sys
!{sys.executable} -m pip install --upgrade pip

# TODO(ccy): Fix need to use deprecated pip resolver.
!{sys.executable} -m pip install --upgrade --use-deprecated=legacy-resolver \
  tfx==0.25.0 tensorflow-model-analysis==0.25.0

Vérifions les versions de TFX.

import tensorflow as tf
import tensorflow_model_analysis as tfma
import tfx

print('TF version: {}'.format(tf.__version__))
print('TFMA version: {}'.format(tfma.__version__))
print('TFX version: {}'.format(tfx.__version__))
TF version: 2.5.1
TFMA version: 0.33.0
TFX version: 1.2.0

Nous sommes prêts à créer un pipeline.

Set PROJECT_DIR à destination appropriée pour votre environnement. Valeur par défaut est ~/imported/${PIPELINE_NAME} qui est approprié pour Google Cloud Platform AI Notebook environnement.

Vous pouvez donner votre pipeline un nom différent en changeant le PIPELINE_NAME ci - dessous. Cela deviendra également le nom du répertoire du projet où vos fichiers seront placés.

PIPELINE_NAME="my_pipeline"
import os
# Set this project directory to your new tfx pipeline project.
PROJECT_DIR=os.path.join(os.path.expanduser("~"), "imported", PIPELINE_NAME)

Copiez les fichiers modèles.

TFX comprend le penguin modèle avec le paquet python TFX. penguin modèle contient de nombreuses instructions pour mettre votre ensemble de données dans le pipeline qui est le but de ce tutoriel.

La tfx template copy commande copie CLI fichiers de modèles prédéfinis dans le répertoire de votre projet.

# Set `PATH` to include user python binary directory and a directory containing `skaffold`.
PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin

!tfx template copy \
  --pipeline-name={PIPELINE_NAME} \
  --destination-path={PROJECT_DIR} \
  --model=penguin
env: PATH=/tmpfs/src/tf_docs_env/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/snap/bin:/opt/puppetlabs/bin:/opt/android-studio/current/bin:/usr/local/go/bin:/usr/local/go/packages/bin:/opt/kubernetes/client/bin/:/home/kbuilder/.local/bin:/home/jupyter/.local/bin
CLI
Copying penguin pipeline template
model_test.py -> /home/kbuilder/imported/my_pipeline/models/model_test.py
constants.py -> /home/kbuilder/imported/my_pipeline/models/constants.py
model.py -> /home/kbuilder/imported/my_pipeline/models/model.py
__init__.py -> /home/kbuilder/imported/my_pipeline/models/__init__.py
preprocessing.py -> /home/kbuilder/imported/my_pipeline/models/preprocessing.py
features_test.py -> /home/kbuilder/imported/my_pipeline/models/features_test.py
features.py -> /home/kbuilder/imported/my_pipeline/models/features.py
preprocessing_test.py -> /home/kbuilder/imported/my_pipeline/models/preprocessing_test.py
__init__.py -> /home/kbuilder/imported/my_pipeline/__init__.py
__init__.py -> /home/kbuilder/imported/my_pipeline/pipeline/__init__.py
configs.py -> /home/kbuilder/imported/my_pipeline/pipeline/configs.py
pipeline.py -> /home/kbuilder/imported/my_pipeline/pipeline/pipeline.py
kubeflow_runner.py -> /home/kbuilder/imported/my_pipeline/kubeflow_runner.py
local_runner.py -> /home/kbuilder/imported/my_pipeline/local_runner.py

Remplacez le contexte du répertoire de travail de ce bloc-notes par le répertoire du projet.

%cd {PROJECT_DIR}
/home/kbuilder/imported/my_pipeline

Parcourir vos fichiers sources copiés

Le modèle TFX fournit des fichiers d'échafaudage de base pour créer un pipeline, y compris le code source Python et des exemples de données. Le penguin modèle utilise le même ensemble de données Palmer Penguins et le modèle ML comme exemple Penguin .

Voici une brève introduction à chacun des fichiers Python.

  • pipeline - Ce répertoire contient la définition de la canalisation
    • configs.py - définit des constantes communes pour les coureurs de pipeline
    • pipeline.py - définit des composants de TFX et un pipeline
  • models - Ce répertoire contient les définitions de modèle ML
    • features.py , features_test.py - définit caractéristiques pour le modèle
    • preprocessing.py , preprocessing_test.py - définit pour les données prétraiter routines
    • constants.py - définit les constantes du modèle
    • model.py , model_test.py - définit le modèle ML ML utilisant des cadres comme tensorflow
  • local_runner.py - définir un coureur pour l' environnement local qui utilise le moteur d'orchestration locale
  • kubeflow_runner.py - définir un coureur pour le moteur d'orchestration Kubeflow Pipelines

Par défaut, le modèle comprend uniquement des composants TFX standard. Si vous avez besoin d'actions personnalisées, vous pouvez créer des composants personnalisés pour votre pipeline. S'il vous plaît voir TFX Guide composant personnalisé pour les détails.

Fichiers de test unitaire.

Vous remarquerez peut - être qu'il ya des fichiers avec _test.py en leur nom. Ce sont des tests unitaires du pipeline et il est recommandé d'ajouter d'autres tests unitaires lorsque vous implémentez vos propres pipelines. Vous pouvez exécuter des tests unitaires en fournissant le nom du module de fichiers de test avec -m drapeau. Vous pouvez habituellement obtenir un nom de module en supprimant .py l' extension et le remplacement / avec . . Par exemple:

import sys
!{sys.executable} -m models.features_test
Running tests under Python 3.7.5: /tmpfs/src/tf_docs_env/bin/python
[ RUN      ] FeaturesTest.testLabelKey
INFO:tensorflow:time(__main__.FeaturesTest.testLabelKey): 0.0s
I0930 02:13:49.520252 139764034877248 test_util.py:2103] time(__main__.FeaturesTest.testLabelKey): 0.0s
[       OK ] FeaturesTest.testLabelKey
[ RUN      ] FeaturesTest.test_session
[  SKIPPED ] FeaturesTest.test_session
----------------------------------------------------------------------
Ran 2 tests in 0.001s

OK (skipped=1)

Créez un pipeline TFX dans un environnement local.

TFX prend en charge plusieurs moteurs d'orchestration pour exécuter des pipelines. Nous utiliserons le moteur d'orchestration local. Le moteur d'orchestration local s'exécute sans aucune autre dépendance et convient au développement et au débogage car il s'exécute sur un environnement local plutôt que sur des clusters informatiques distants.

Nous utiliserons local_runner.py pour gérer votre pipeline à l' aide orchestrateur locale. Vous devez créer un pipeline avant de l'exécuter. Vous pouvez créer un pipeline avec pipeline create commande.

tfx pipeline create --engine=local --pipeline_path=local_runner.py
CLI
Creating pipeline
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
Pipeline "my_pipeline" created successfully.

pipeline create des registres de commande votre pipeline défini dans local_runner.py sans réellement l' exécuter.

Vous courrez le pipeline créé avec l' run create de run create commande dans les étapes suivantes.

Étape 2. Ingérez VOS données dans le pipeline.

Le pipeline initial ingère l'ensemble de données de pingouin qui est inclus dans le modèle. Vous devez mettre vos données dans le pipeline, et la plupart des pipelines TFX commencent par le composant ExampleGen.

Choisissez un exempleGen

Vos données peuvent être stockées partout où votre pipeline peut accéder, sur un système de fichiers local ou distribué, ou sur un système interrogeable. TFX fournit divers ExampleGen composants pour amener vos données dans un pipeline de TFX. Vous pouvez en choisir un parmi les exemples de composants de génération suivants.

Vous pouvez également créer votre propre ExampleGen, par exemple, TFX comprend un ExecampleGen personnalisé qui utilise Presto comme source de données. Voir le guide pour plus d' informations sur la façon d'utiliser et de développer des exécuteurs personnalisés.

Une fois que vous avez décidé quel ExampleGen utiliser, vous devrez modifier la définition du pipeline pour utiliser vos données.

  1. Modifier le DATA_PATH dans local_runner.py et le mettre à l'emplacement de vos fichiers.

    • Si vous avez des fichiers dans un environnement local, spécifiez le chemin. C'est la meilleure option pour développer ou déboguer un pipeline.
    • Si les fichiers sont stockés dans GCS, vous pouvez utiliser un chemin commençant par gs://{bucket_name}/... . S'il vous plaît assurez - vous que vous pouvez accéder GCS à partir de votre terminal, par exemple, en utilisant gsutil . S'il vous plaît suivre Guide d'autorisation dans Google Cloud si nécessaire.
    • Si vous souhaitez utiliser un ExampleGen basé sur une requête comme BigQueryExampleGen, vous avez besoin d'une instruction Query pour sélectionner les données de la source de données. Vous devez définir quelques éléments supplémentaires pour utiliser Google Cloud BigQuery comme source de données.
    • Dans pipeline/configs.py :
      • Changement GOOGLE_CLOUD_PROJECT et GCS_BUCKET_NAME à votre projet GCP et le nom du godet. Le bucket doit exister avant d'exécuter le pipeline.
      • Uncomment BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS variable.
      • Décommentez et ensemble BIG_QUERY_QUERY variable à votre instruction de requête.
    • En local_runner.py :
      • Commentez data_path l' argument et uncomment query l' argument plutôt dans pipeline.create_pipeline() .
    • Dans pipeline/pipeline.py :
      • Commentez data_path l' argument et uncomment query argument create_pipeline() .
      • Utilisez BigQueryExampleGen au lieu de CsvExampleGen.
  2. Remplacer l' actuel CsvExampleGen à votre classe ExampleGen dans pipeline/pipeline.py . Chaque classe ExampleGen a une signature différente. S'il vous plaît voir Guide composante ExampleGen pour plus de détails. Ne pas oublier d'importer des modules nécessaires à l' import énoncés dans pipeline/pipeline.py .

La canalisation de départ est constitué de quatre éléments, ExampleGen , StatisticsGen , SchemaGen et ExampleValidator . Nous ne devons rien de changement pour StatisticsGen , SchemaGen et ExampleValidator . Exécutons le pipeline pour la première fois.

# Update and run the pipeline.
!tfx pipeline update --engine=local --pipeline_path=local_runner.py \
 && tfx run create --engine=local --pipeline_name={PIPELINE_NAME}
CLI
Updating pipeline
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
Pipeline "my_pipeline" updated successfully.
CLI
Creating a run for pipeline: my_pipeline
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Running pipeline:
 pipeline_info {
  id: "my_pipeline"
}
nodes {
  pipeline_node {
    node_info {
      type {
        name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
      }
      id: "CsvExampleGen"
    }
    contexts {
      contexts {
        type {
          name: "pipeline"
        }
        name {
          field_value {
            string_value: "my_pipeline"
          }
        }
      }
      contexts {
        type {
          name: "pipeline_run"
        }
        name {
          field_value {
            string_value: "2021-09-30T02:14:06.996966"
          }
        }
      }
      contexts {
        type {
          name: "node"
        }
        name {
          field_value {
            string_value: "my_pipeline.CsvExampleGen"
          }
        }
      }
    }
    outputs {
      outputs {
        key: "examples"
        value {
          artifact_spec {
            type {
              name: "Examples"
              properties {
                key: "span"
                value: INT
              }
              properties {
                key: "split_names"
                value: STRING
              }
              properties {
                key: "version"
                value: INT
              }
            }
          }
        }
      }
    }
    parameters {
      parameters {
        key: "input_base"
        value {
          field_value {
            string_value: "/home/kbuilder/imported/my_pipeline/data"
          }
        }
      }
      parameters {
        key: "input_config"
        value {
          field_value {
            string_value: "{\n  \"splits\": [\n    {\n      \"name\": \"single_split\",\n      \"pattern\": \"*\"\n    }\n  ]\n}"
          }
        }
      }
      parameters {
        key: "output_config"
        value {
          field_value {
            string_value: "{\n  \"split_config\": {\n    \"splits\": [\n      {\n        \"hash_buckets\": 2,\n        \"name\": \"train\"\n      },\n      {\n        \"hash_buckets\": 1,\n        \"name\": \"eval\"\n      }\n    ]\n  }\n}"
          }
        }
      }
      parameters {
        key: "output_data_format"
        value {
          field_value {
            int_value: 6
          }
        }
      }
      parameters {
        key: "output_file_format"
        value {
          field_value {
            int_value: 5
          }
        }
      }
    }
    downstream_nodes: "StatisticsGen"
    execution_options {
      caching_options {
      }
    }
  }
}
nodes {
  pipeline_node {
    node_info {
      type {
        name: "tfx.components.statistics_gen.component.StatisticsGen"
      }
      id: "StatisticsGen"
    }
    contexts {
      contexts {
        type {
          name: "pipeline"
        }
        name {
          field_value {
            string_value: "my_pipeline"
          }
        }
      }
      contexts {
        type {
          name: "pipeline_run"
        }
        name {
          field_value {
            string_value: "2021-09-30T02:14:06.996966"
          }
        }
      }
      contexts {
        type {
          name: "node"
        }
        name {
          field_value {
            string_value: "my_pipeline.StatisticsGen"
          }
        }
      }
    }
    inputs {
      inputs {
        key: "examples"
        value {
          channels {
            producer_node_query {
              id: "CsvExampleGen"
            }
            context_queries {
              type {
                name: "pipeline"
              }
              name {
                field_value {
                  string_value: "my_pipeline"
                }
              }
            }
            context_queries {
              type {
                name: "pipeline_run"
              }
              name {
                field_value {
                  string_value: "2021-09-30T02:14:06.996966"
                }
              }
            }
            context_queries {
              type {
                name: "node"
              }
              name {
                field_value {
                  string_value: "my_pipeline.CsvExampleGen"
                }
              }
            }
            artifact_query {
              type {
                name: "Examples"
              }
            }
            output_key: "examples"
          }
        }
      }
    }
    outputs {
      outputs {
        key: "statistics"
        value {
          artifact_spec {
            type {
              name: "ExampleStatistics"
              properties {
                key: "span"
                value: INT
              }
              properties {
                key: "split_names"
                value: STRING
              }
            }
          }
        }
      }
    }
    parameters {
      parameters {
        key: "exclude_splits"
        value {
          field_value {
            string_value: "[]"
          }
        }
      }
    }
    upstream_nodes: "CsvExampleGen"
    downstream_nodes: "ExampleValidator"
    downstream_nodes: "SchemaGen"
    execution_options {
      caching_options {
      }
    }
  }
}
nodes {
  pipeline_node {
    node_info {
      type {
        name: "tfx.components.schema_gen.component.SchemaGen"
      }
      id: "SchemaGen"
    }
    contexts {
      contexts {
        type {
          name: "pipeline"
        }
        name {
          field_value {
            string_value: "my_pipeline"
          }
        }
      }
      contexts {
        type {
          name: "pipeline_run"
        }
        name {
          field_value {
            string_value: "2021-09-30T02:14:06.996966"
          }
        }
      }
      contexts {
        type {
          name: "node"
        }
        name {
          field_value {
            string_value: "my_pipeline.SchemaGen"
          }
        }
      }
    }
    inputs {
      inputs {
        key: "statistics"
        value {
          channels {
            producer_node_query {
              id: "StatisticsGen"
            }
            context_queries {
              type {
                name: "pipeline"
              }
              name {
                field_value {
                  string_value: "my_pipeline"
                }
              }
            }
            context_queries {
              type {
                name: "pipeline_run"
              }
              name {
                field_value {
                  string_value: "2021-09-30T02:14:06.996966"
                }
              }
            }
            context_queries {
              type {
                name: "node"
              }
              name {
                field_value {
                  string_value: "my_pipeline.StatisticsGen"
                }
              }
            }
            artifact_query {
              type {
                name: "ExampleStatistics"
              }
            }
            output_key: "statistics"
          }
        }
      }
    }
    outputs {
      outputs {
        key: "schema"
        value {
          artifact_spec {
            type {
              name: "Schema"
            }
          }
        }
      }
    }
    parameters {
      parameters {
        key: "exclude_splits"
        value {
          field_value {
            string_value: "[]"
          }
        }
      }
      parameters {
        key: "infer_feature_shape"
        value {
          field_value {
            int_value: 1
          }
        }
      }
    }
    upstream_nodes: "StatisticsGen"
    downstream_nodes: "ExampleValidator"
    execution_options {
      caching_options {
      }
    }
  }
}
nodes {
  pipeline_node {
    node_info {
      type {
        name: "tfx.components.example_validator.component.ExampleValidator"
      }
      id: "ExampleValidator"
    }
    contexts {
      contexts {
        type {
          name: "pipeline"
        }
        name {
          field_value {
            string_value: "my_pipeline"
          }
        }
      }
      contexts {
        type {
          name: "pipeline_run"
        }
        name {
          field_value {
            string_value: "2021-09-30T02:14:06.996966"
          }
        }
      }
      contexts {
        type {
          name: "node"
        }
        name {
          field_value {
            string_value: "my_pipeline.ExampleValidator"
          }
        }
      }
    }
    inputs {
      inputs {
        key: "schema"
        value {
          channels {
            producer_node_query {
              id: "SchemaGen"
            }
            context_queries {
              type {
                name: "pipeline"
              }
              name {
                field_value {
                  string_value: "my_pipeline"
                }
              }
            }
            context_queries {
              type {
                name: "pipeline_run"
              }
              name {
                field_value {
                  string_value: "2021-09-30T02:14:06.996966"
                }
              }
            }
            context_queries {
              type {
                name: "node"
              }
              name {
                field_value {
                  string_value: "my_pipeline.SchemaGen"
                }
              }
            }
            artifact_query {
              type {
                name: "Schema"
              }
            }
            output_key: "schema"
          }
        }
      }
      inputs {
        key: "statistics"
        value {
          channels {
            producer_node_query {
              id: "StatisticsGen"
            }
            context_queries {
              type {
                name: "pipeline"
              }
              name {
                field_value {
                  string_value: "my_pipeline"
                }
              }
            }
            context_queries {
              type {
                name: "pipeline_run"
              }
              name {
                field_value {
                  string_value: "2021-09-30T02:14:06.996966"
                }
              }
            }
            context_queries {
              type {
                name: "node"
              }
              name {
                field_value {
                  string_value: "my_pipeline.StatisticsGen"
                }
              }
            }
            artifact_query {
              type {
                name: "ExampleStatistics"
              }
            }
            output_key: "statistics"
          }
        }
      }
    }
    outputs {
      outputs {
        key: "anomalies"
        value {
          artifact_spec {
            type {
              name: "ExampleAnomalies"
              properties {
                key: "span"
                value: INT
              }
              properties {
                key: "split_names"
                value: STRING
              }
            }
          }
        }
      }
    }
    parameters {
      parameters {
        key: "exclude_splits"
        value {
          field_value {
            string_value: "[]"
          }
        }
      }
    }
    upstream_nodes: "SchemaGen"
    upstream_nodes: "StatisticsGen"
    execution_options {
      caching_options {
      }
    }
  }
}
runtime_spec {
  pipeline_root {
    field_value {
      string_value: "./tfx_pipeline_output/my_pipeline"
    }
  }
  pipeline_run_id {
    field_value {
      string_value: "2021-09-30T02:14:06.996966"
    }
  }
}
execution_mode: SYNC
deployment_config {
  type_url: "type.googleapis.com/tfx.orchestration.IntermediateDeploymentConfig"
  value: "\n\216\001\n\tSchemaGen\022\200\001\nOtype.googleapis.com/tfx.orchestration.executable_spec.PythonClassExecutableSpec\022-\n+tfx.components.schema_gen.executor.Executor\n\220\001\n\rStatisticsGen\022\177\nHtype.googleapis.com/tfx.orchestration.executable_spec.BeamExecutableSpec\0223\n1\n/tfx.components.statistics_gen.executor.Executor\n\236\001\n\rCsvExampleGen\022\214\001\nHtype.googleapis.com/tfx.orchestration.executable_spec.BeamExecutableSpec\022@\n>\n<tfx.components.example_gen.csv_example_gen.executor.Executor\n\234\001\n\020ExampleValidator\022\207\001\nOtype.googleapis.com/tfx.orchestration.executable_spec.PythonClassExecutableSpec\0224\n2tfx.components.example_validator.executor.Executor\022\230\001\n\rCsvExampleGen\022\206\001\nOtype.googleapis.com/tfx.orchestration.executable_spec.PythonClassExecutableSpec\0223\n1tfx.components.example_gen.driver.FileBasedDriver*`\n0type.googleapis.com/ml_metadata.ConnectionConfig\022,\032*\n&./tfx_metadata/my_pipeline/metadata.db\020\003"
}

INFO:absl:Using deployment config:
 executor_specs {
  key: "CsvExampleGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor"
      }
    }
  }
}
executor_specs {
  key: "ExampleValidator"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.example_validator.executor.Executor"
    }
  }
}
executor_specs {
  key: "SchemaGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.schema_gen.executor.Executor"
    }
  }
}
executor_specs {
  key: "StatisticsGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.statistics_gen.executor.Executor"
      }
    }
  }
}
custom_driver_specs {
  key: "CsvExampleGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.example_gen.driver.FileBasedDriver"
    }
  }
}
metadata_connection_config {
  sqlite {
    filename_uri: "./tfx_metadata/my_pipeline/metadata.db"
    connection_mode: READWRITE_OPENCREATE
  }
}

INFO:absl:Using connection config:
 sqlite {
  filename_uri: "./tfx_metadata/my_pipeline/metadata.db"
  connection_mode: READWRITE_OPENCREATE
}

INFO:absl:Component CsvExampleGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
  }
  id: "CsvExampleGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2021-09-30T02:14:06.996966"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.CsvExampleGen"
      }
    }
  }
}
outputs {
  outputs {
    key: "examples"
    value {
      artifact_spec {
        type {
          name: "Examples"
          properties {
            key: "span"
            value: INT
          }
          properties {
            key: "split_names"
            value: STRING
          }
          properties {
            key: "version"
            value: INT
          }
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "input_base"
    value {
      field_value {
        string_value: "/home/kbuilder/imported/my_pipeline/data"
      }
    }
  }
  parameters {
    key: "input_config"
    value {
      field_value {
        string_value: "{\n  \"splits\": [\n    {\n      \"name\": \"single_split\",\n      \"pattern\": \"*\"\n    }\n  ]\n}"
      }
    }
  }
  parameters {
    key: "output_config"
    value {
      field_value {
        string_value: "{\n  \"split_config\": {\n    \"splits\": [\n      {\n        \"hash_buckets\": 2,\n        \"name\": \"train\"\n      },\n      {\n        \"hash_buckets\": 1,\n        \"name\": \"eval\"\n      }\n    ]\n  }\n}"
      }
    }
  }
  parameters {
    key: "output_data_format"
    value {
      field_value {
        int_value: 6
      }
    }
  }
  parameters {
    key: "output_file_format"
    value {
      field_value {
        int_value: 5
      }
    }
  }
}
downstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
  }
}

INFO:absl:MetadataStore with DB connection initialized
WARNING: Logging before InitGoogleLogging() is written to STDERR
I0930 02:14:07.012264 18855 rdbms_metadata_access_object.cc:686] No property is defined for the Type
I0930 02:14:07.018623 18855 rdbms_metadata_access_object.cc:686] No property is defined for the Type
I0930 02:14:07.024567 18855 rdbms_metadata_access_object.cc:686] No property is defined for the Type
I0930 02:14:07.030488 18855 rdbms_metadata_access_object.cc:686] No property is defined for the Type
INFO:absl:select span and version = (0, None)
INFO:absl:latest span and version = (0, None)
INFO:absl:MetadataStore with DB connection initialized
I0930 02:14:07.041599 18855 rdbms_metadata_access_object.cc:686] No property is defined for the Type
INFO:absl:Going to run a new execution 1
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=1, input_dict={}, output_dict=defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/1"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1632968027,sum_checksum:1632968027"
  }
}
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2021-09-30T02:14:06.996966:CsvExampleGen:examples:0"
  }
}
custom_properties {
  key: "span"
  value {
    int_value: 0
  }
}
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
properties {
  key: "version"
  value: INT
}
)]}), exec_properties={'output_data_format': 6, 'output_file_format': 5, 'input_config': '{\n  "splits": [\n    {\n      "name": "single_split",\n      "pattern": "*"\n    }\n  ]\n}', 'output_config': '{\n  "split_config": {\n    "splits": [\n      {\n        "hash_buckets": 2,\n        "name": "train"\n      },\n      {\n        "hash_buckets": 1,\n        "name": "eval"\n      }\n    ]\n  }\n}', 'input_base': '/home/kbuilder/imported/my_pipeline/data', 'span': 0, 'version': None, 'input_fingerprint': 'split:single_split,num_files:1,total_bytes:25648,xor_checksum:1632968027,sum_checksum:1632968027'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/1/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/stateful_working_dir/2021-09-30T02:14:06.996966', tmp_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/1/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
  }
  id: "CsvExampleGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2021-09-30T02:14:06.996966"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.CsvExampleGen"
      }
    }
  }
}
outputs {
  outputs {
    key: "examples"
    value {
      artifact_spec {
        type {
          name: "Examples"
          properties {
            key: "span"
            value: INT
          }
          properties {
            key: "split_names"
            value: STRING
          }
          properties {
            key: "version"
            value: INT
          }
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "input_base"
    value {
      field_value {
        string_value: "/home/kbuilder/imported/my_pipeline/data"
      }
    }
  }
  parameters {
    key: "input_config"
    value {
      field_value {
        string_value: "{\n  \"splits\": [\n    {\n      \"name\": \"single_split\",\n      \"pattern\": \"*\"\n    }\n  ]\n}"
      }
    }
  }
  parameters {
    key: "output_config"
    value {
      field_value {
        string_value: "{\n  \"split_config\": {\n    \"splits\": [\n      {\n        \"hash_buckets\": 2,\n        \"name\": \"train\"\n      },\n      {\n        \"hash_buckets\": 1,\n        \"name\": \"eval\"\n      }\n    ]\n  }\n}"
      }
    }
  }
  parameters {
    key: "output_data_format"
    value {
      field_value {
        int_value: 6
      }
    }
  }
  parameters {
    key: "output_file_format"
    value {
      field_value {
        int_value: 5
      }
    }
  }
}
downstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
  }
}
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2021-09-30T02:14:06.996966')
INFO:absl:Generating examples.
INFO:absl:Processing input csv data /home/kbuilder/imported/my_pipeline/data/* to TFExample.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be.
INFO:absl:Examples generated.
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 1 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/1"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1632968027,sum_checksum:1632968027"
  }
}
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2021-09-30T02:14:06.996966:CsvExampleGen:examples:0"
  }
}
custom_properties {
  key: "span"
  value {
    int_value: 0
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.2.0"
  }
}
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
properties {
  key: "version"
  value: INT
}
)]}) for execution 1
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component CsvExampleGen is finished.
INFO:absl:Component StatisticsGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.statistics_gen.component.StatisticsGen"
  }
  id: "StatisticsGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2021-09-30T02:14:06.996966"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.StatisticsGen"
      }
    }
  }
}
inputs {
  inputs {
    key: "examples"
    value {
      channels {
        producer_node_query {
          id: "CsvExampleGen"
        }
        context_queries {
          type {
            name: "pipeline"
          }
          name {
            field_value {
              string_value: "my_pipeline"
            }
          }
        }
        context_queries {
          type {
            name: "pipeline_run"
          }
          name {
            field_value {
              string_value: "2021-09-30T02:14:06.996966"
            }
          }
        }
        context_queries {
          type {
            name: "node"
          }
          name {
            field_value {
              string_value: "my_pipeline.CsvExampleGen"
            }
          }
        }
        artifact_query {
          type {
            name: "Examples"
          }
        }
        output_key: "examples"
      }
    }
  }
}
outputs {
  outputs {
    key: "statistics"
    value {
      artifact_spec {
        type {
          name: "ExampleStatistics"
          properties {
            key: "span"
            value: INT
          }
          properties {
            key: "split_names"
            value: STRING
          }
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
      }
    }
  }
}
upstream_nodes: "CsvExampleGen"
downstream_nodes: "ExampleValidator"
downstream_nodes: "SchemaGen"
execution_options {
  caching_options {
  }
}

INFO:absl:MetadataStore with DB connection initialized
I0930 02:14:08.044364 18855 rdbms_metadata_access_object.cc:686] No property is defined for the Type
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 2
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=2, input_dict={'examples': [Artifact(artifact: id: 1
type_id: 15
uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/1"
properties {
  key: "split_names"
  value {
    string_value: "[\"train\", \"eval\"]"
  }
}
custom_properties {
  key: "file_format"
  value {
    string_value: "tfrecords_gzip"
  }
}
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1632968027,sum_checksum:1632968027"
  }
}
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2021-09-30T02:14:06.996966:CsvExampleGen:examples:0"
  }
}
custom_properties {
  key: "payload_format"
  value {
    string_value: "FORMAT_TF_EXAMPLE"
  }
}
custom_properties {
  key: "span"
  value {
    int_value: 0
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.2.0"
  }
}
state: LIVE
create_time_since_epoch: 1632968048030
last_update_time_since_epoch: 1632968048030
, artifact_type: id: 15
name: "Examples"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
properties {
  key: "version"
  value: INT
}
)]}, output_dict=defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/2"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2021-09-30T02:14:06.996966:StatisticsGen:statistics:0"
  }
}
, artifact_type: name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
)]}), exec_properties={'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/2/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/stateful_working_dir/2021-09-30T02:14:06.996966', tmp_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/2/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.statistics_gen.component.StatisticsGen"
  }
  id: "StatisticsGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2021-09-30T02:14:06.996966"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.StatisticsGen"
      }
    }
  }
}
inputs {
  inputs {
    key: "examples"
    value {
      channels {
        producer_node_query {
          id: "CsvExampleGen"
        }
        context_queries {
          type {
            name: "pipeline"
          }
          name {
            field_value {
              string_value: "my_pipeline"
            }
          }
        }
        context_queries {
          type {
            name: "pipeline_run"
          }
          name {
            field_value {
              string_value: "2021-09-30T02:14:06.996966"
            }
          }
        }
        context_queries {
          type {
            name: "node"
          }
          name {
            field_value {
              string_value: "my_pipeline.CsvExampleGen"
            }
          }
        }
        artifact_query {
          type {
            name: "Examples"
          }
        }
        output_key: "examples"
      }
    }
  }
}
outputs {
  outputs {
    key: "statistics"
    value {
      artifact_spec {
        type {
          name: "ExampleStatistics"
          properties {
            key: "span"
            value: INT
          }
          properties {
            key: "split_names"
            value: STRING
          }
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
      }
    }
  }
}
upstream_nodes: "CsvExampleGen"
downstream_nodes: "ExampleValidator"
downstream_nodes: "SchemaGen"
execution_options {
  caching_options {
  }
}
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2021-09-30T02:14:06.996966')
INFO:absl:Generating statistics for split train.
INFO:absl:Statistics for split train written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/2/Split-train.
INFO:absl:Generating statistics for split eval.
INFO:absl:Statistics for split eval written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/2/Split-eval.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
INFO:absl:MetadataStore with DB connection initialized
ERROR:absl:Execution 2 failed.
INFO:absl:Cleaning up stateless execution info.
Traceback (most recent call last):
  File "local_runner.py", line 78, in <module>
    run()
  File "local_runner.py", line 73, in run
    .sqlite_metadata_connection_config(METADATA_PATH)))
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tfx/orchestration/local/local_dag_runner.py", line 91, in run
    component_launcher.launch()
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tfx/orchestration/portable/launcher.py", line 482, in launch
    executor_output = self._run_executor(execution_info)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tfx/orchestration/portable/launcher.py", line 358, in _run_executor
    executor_output = self._executor_operator.run_executor(execution_info)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tfx/orchestration/portable/beam_executor_operator.py", line 89, in run_executor
    return python_executor_operator.run_with_executor(execution_info, executor)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tfx/orchestration/portable/python_executor_operator.py", line 55, in run_with_executor
    execution_info.exec_properties)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tfx/components/statistics_gen/executor.py", line 142, in Do
    output_uri)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/apache_beam/pipeline.py", line 585, in __exit__
    self.result = self.run()
  File "/home/kbuilder/.local/lib/python3.7/site-packages/apache_beam/pipeline.py", line 564, in run
    return self.runner.run_pipeline(self, self._options)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py", line 131, in run_pipeline
    return runner.run_pipeline(pipeline, options)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 196, in run_pipeline
    pipeline.to_runner_api(default_environment=self._default_environment))
  File "/home/kbuilder/.local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 206, in run_via_runner_api
    return self.run_stages(stage_context, stages)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 385, in run_stages
    runner_execution_context, bundle_context_manager)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 653, in _run_stage
    bundle_manager))
  File "/home/kbuilder/.local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 770, in _run_bundle
    data_input, data_output, input_timers, expected_timer_output)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1080, in process_bundle
    result_future = self._worker_handler.control_conn.push(process_bundle_req)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", line 378, in push
    response = self.worker.do_instruction(request)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 602, in do_instruction
    getattr(request, request_type), request.instruction_id)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 639, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "/home/kbuilder/.local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1002, in process_bundle
    op.finish()
  File "apache_beam/runners/worker/operations.py", line 1071, in apache_beam.runners.worker.operations.PGBKCVOperation.finish
  File "apache_beam/runners/worker/operations.py", line 1074, in apache_beam.runners.worker.operations.PGBKCVOperation.finish
  File "apache_beam/runners/worker/operations.py", line 1089, in apache_beam.runners.worker.operations.PGBKCVOperation.output_key
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tensorflow_data_validation/statistics/stats_impl.py", line 757, in compact
    self._maybe_do_batch(accumulator, force=True)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tensorflow_data_validation/statistics/stats_impl.py", line 690, in _maybe_do_batch
    accumulator.partial_accumulators)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tensorflow_data_validation/statistics/stats_impl.py", line 649, in _for_each_generator
    self._generators, zip(*args))]
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tensorflow_data_validation/statistics/stats_impl.py", line 648, in <listcomp>
    return [func(gen, *args_for_func) for gen, args_for_func in zip(
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tensorflow_data_validation/statistics/stats_impl.py", line 689, in <lambda>
    lambda gen, gen_acc: gen.add_input(gen_acc, record_batch),
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tensorflow_data_validation/statistics/generators/basic_stats_generator.py", line 1026, in add_input
    weights)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tensorflow_data_validation/statistics/generators/basic_stats_generator.py", line 232, in update
    weights)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tensorflow_data_validation/statistics/generators/basic_stats_generator.py", line 128, in update
    num_values_grouped = pa.array(num_values_not_none).value_counts()
  File "pyarrow/array.pxi", line 292, in pyarrow.lib.array
  File "pyarrow/array.pxi", line 79, in pyarrow.lib._ndarray_to_array
  File "pyarrow/array.pxi", line 67, in pyarrow.lib._ndarray_to_type
  File "pyarrow/error.pxi", line 107, in pyarrow.lib.check_status
pyarrow.lib.ArrowTypeError: Did not pass numpy.dtype object
Error while running "/usr/bin/python3.7 local_runner.py"

Vous devriez voir « Component ExampleValidator est terminé ». si le pipeline s'est exécuté avec succès.

Examinez la sortie du pipeline.

Pipeline TFX produit deux types de sortie, des artefacts et un méta - données DB (MLMD) qui contient des métadonnées d'objets façonnés et les exécutions de pipeline. L'emplacement de la sortie est définie dans local_runner.py . Par défaut, les artefacts sont stockés sous tfx_pipeline_output répertoire et les métadonnées sont stockées comme une base de données SQLite sous tfx_metadata répertoire.

Vous pouvez utiliser les API MLMD pour examiner ces sorties. Tout d'abord, nous allons définir quelques fonctions utilitaires pour rechercher les artefacts de sortie qui viennent d'être produits.

import tensorflow as tf
import tfx
from ml_metadata import errors
from ml_metadata.proto import metadata_store_pb2
from tfx.types import artifact_utils

# TODO(b/171447278): Move these functions into TFX library.

def get_latest_executions(store, pipeline_name, component_id = None):
  """Fetch all pipeline runs."""
  if component_id is None:  # Find entire pipeline runs.
    run_contexts = [
        c for c in store.get_contexts_by_type('run')
        if c.properties['pipeline_name'].string_value == pipeline_name
    ]
  else:  # Find specific component runs.
    run_contexts = [
        c for c in store.get_contexts_by_type('component_run')
        if c.properties['pipeline_name'].string_value == pipeline_name and
           c.properties['component_id'].string_value == component_id
    ]
  if not run_contexts:
    return []
  # Pick the latest run context.
  latest_context = max(run_contexts,
                       key=lambda c: c.last_update_time_since_epoch)
  return store.get_executions_by_context(latest_context.id)

def get_latest_artifacts(store, pipeline_name, component_id = None):
  """Fetch all artifacts from latest pipeline execution."""
  executions = get_latest_executions(store, pipeline_name, component_id)

  # Fetch all artifacts produced from the given executions.
  execution_ids = [e.id for e in executions]
  events = store.get_events_by_execution_ids(execution_ids)
  artifact_ids = [
      event.artifact_id for event in events
      if event.type == metadata_store_pb2.Event.OUTPUT
  ]
  return store.get_artifacts_by_id(artifact_ids)

def find_latest_artifacts_by_type(store, artifacts, artifact_type):
  """Get the latest artifacts of a specified type."""
  # Get type information from MLMD
  try:
    artifact_type = store.get_artifact_type(artifact_type)
  except errors.NotFoundError:
    return []
  # Filter artifacts with type.
  filtered_artifacts = [aritfact for aritfact in artifacts
                        if aritfact.type_id == artifact_type.id]
  # Convert MLMD artifact data into TFX Artifact instances.
  return [artifact_utils.deserialize_artifact(artifact_type, artifact)
      for artifact in filtered_artifacts]


from tfx.orchestration.experimental.interactive import visualizations

def visualize_artifacts(artifacts):
  """Visualizes artifacts using standard visualization modules."""
  for artifact in artifacts:
    visualization = visualizations.get_registry().get_visualization(
        artifact.type_name)
    if visualization:
      visualization.display(artifact)

from tfx.orchestration.experimental.interactive import standard_visualizations
standard_visualizations.register_standard_visualizations()

import pprint

from tfx.orchestration import metadata
from tfx.types import artifact_utils
from tfx.types import standard_artifacts

def preview_examples(artifacts):
  """Preview a few records from Examples artifacts."""
  pp = pprint.PrettyPrinter()
  for artifact in artifacts:
    print("==== Examples artifact:{}({})".format(artifact.name, artifact.uri))
    for split in artifact_utils.decode_split_names(artifact.split_names):
      print("==== Reading from split:{}".format(split))
      split_uri = artifact_utils.get_split_uri([artifact], split)

      # Get the list of files in this directory (all compressed TFRecord files)
      tfrecord_filenames = [os.path.join(split_uri, name)
                            for name in os.listdir(split_uri)]
      # Create a `TFRecordDataset` to read these files
      dataset = tf.data.TFRecordDataset(tfrecord_filenames,
                                        compression_type="GZIP")
      # Iterate over the first 2 records and decode them.
      for tfrecord in dataset.take(2):
        serialized_example = tfrecord.numpy()
        example = tf.train.Example()
        example.ParseFromString(serialized_example)
        pp.pprint(example)

import local_runner

metadata_connection_config = metadata.sqlite_metadata_connection_config(
              local_runner.METADATA_PATH)

Nous pouvons maintenant lire les métadonnées des artefacts de sortie de MLMD.

with metadata.Metadata(metadata_connection_config) as metadata_handler:
    # Search all aritfacts from the previous pipeline run.
    artifacts = get_latest_artifacts(metadata_handler.store, PIPELINE_NAME)
    # Find artifacts of Examples type.
    examples_artifacts = find_latest_artifacts_by_type(
        metadata_handler.store, artifacts,
        standard_artifacts.Examples.TYPE_NAME)
    # Find artifacts generated from StatisticsGen.
    stats_artifacts = find_latest_artifacts_by_type(
        metadata_handler.store, artifacts,
        standard_artifacts.ExampleStatistics.TYPE_NAME)
    # Find artifacts generated from SchemaGen.
    schema_artifacts = find_latest_artifacts_by_type(
        metadata_handler.store, artifacts,
        standard_artifacts.Schema.TYPE_NAME)
    # Find artifacts generated from ExampleValidator.
    anomalies_artifacts = find_latest_artifacts_by_type(
        metadata_handler.store, artifacts,
        standard_artifacts.ExampleAnomalies.TYPE_NAME)

Nous pouvons maintenant examiner les sorties de chaque composant. Tensorflow Validation des StatisticsGen SchemaGen ExampleValidator données (TFDV) est utilisé dans StatisticsGen , SchemaGen et ExampleValidator et TFDV peut être utilisé pour visualiser les sorties de ces composants.

Dans ce didacticiel, nous utiliserons des méthodes d'aide à la visualisation dans TFX qui utilisent TFDV en interne pour afficher la visualisation. S'il vous plaît voir composants TFX tutoriel pour en savoir plus sur chaque composant.

Examiner le formulaire de sortie ExampleGen

Examinons la sortie de ExampleGen. Jetez un œil aux deux premiers exemples pour chaque division :

preview_examples(examples_artifacts)

Par défaut, TFX ExampleGen divise en deux exemples se divise, train et eval, mais vous pouvez ajuster votre configuration split .

Examiner la sortie de StatisticsGen

visualize_artifacts(stats_artifacts)

Ces statistiques sont fournies à SchemaGen pour construire automatiquement un schéma de données.

Examiner la sortie de SchemaGen

visualize_artifacts(schema_artifacts)

Ce schéma est automatiquement déduit de la sortie de StatisticsGen. Nous utiliserons ce schéma généré dans ce tutoriel, mais vous pouvez également modifier et personnaliser le schéma .

Examiner la sortie de ExampleValidator

visualize_artifacts(anomalies_artifacts)

Si des anomalies ont été trouvées, vous pouvez examiner vos données afin que tous les exemples suivent vos hypothèses. Les sorties d'autres composants comme StatistcsGen peuvent être utiles. Les anomalies détectées ne bloquent pas l'exécution du pipeline.

Vous pouvez voir les caractéristiques disponibles à partir des sorties du SchemaGen . Si vos fonctions peuvent être utilisées pour construire le modèle ML Trainer directement, vous pouvez sauter l'étape suivante et passez à l' étape 4. Sinon , vous pouvez faire des travaux d'ingénierie de fonction à l'étape suivante. Transform le composant est nécessaire lorsque les opérations de passe-plein , comme les moyennes de calcul sont nécessaires, en particulier lorsque vous avez besoin à l' échelle.

Étape 3. (Facultatif) Ingénierie des fonctionnalités avec le composant Transform.

Dans cette étape, vous allez définir différents emplois d'ingénierie de fonction qui sera utilisée par Transform le composant dans le pipeline. Voir Transformer Guide composants pour plus d' informations.

Cela n'est nécessaire que si votre code d'entraînement nécessite des fonctionnalités supplémentaires qui ne sont pas disponibles dans la sortie de ExampleGen. Sinon, n'hésitez pas à passer rapidement à l'étape suivante de l'utilisation de Trainer.

Définir les caractéristiques du modèle

models/features.py contient des constantes pour définir les caractéristiques du modèle , y compris les noms des fonctions, la taille des vocabulariy et ainsi de suite. Par défaut penguin modèle a deux Costants, FEATURE_KEYS et LABEL_KEY , parce que notre penguin modèle permet de résoudre un problème de classification en utilisant l' apprentissage supervisé et toutes les fonctions sont des fonctions numériques continues. Voir les définitions des fonctions de l'exemple de taxi de Chicago pour un autre exemple.

Implémentez le prétraitement pour l'entraînement/la diffusion dans preprocessing_fn().

Ingénierie caractéristique réelle se produit dans preprocessing_fn() fonction dans les models/preprocessing.py .

Dans preprocessing_fn vous pouvez définir une série de fonctions qui manipulent la dict d'entrée de tenseurs pour produire le dict de sortie de tenseurs. Il y a des fonctions d' assistance comme scale_to_0_1 et compute_and_apply_vocabulary dans la tensorflow API ou Transform vous pouvez simplement utiliser les fonctions régulières de tensorflow. Par défaut penguin modèle inclut par exemple les usages de tft.scale_to_z_score fonction des valeurs de caractéristiques de Normaliser.

Voir Tensflow Transformer Guide pour plus d' informations sur authoring preprocessing_fn .

Ajoutez le composant Transform au pipeline.

Si votre preprocessing_fn est prêt, ajoutez Transform composant le pipeline.

  1. Dans pipeline/pipeline.py fichier, décommentez # components.append(transform) pour ajouter le composant à la canalisation.

Vous pouvez mettre à jour le pipeline et l'exécuter à nouveau.

!tfx pipeline update --engine=local --pipeline_path=local_runner.py \
 && tfx run create --engine=local --pipeline_name={PIPELINE_NAME}
CLI
Updating pipeline
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
Pipeline "my_pipeline" updated successfully.
CLI
Creating a run for pipeline: my_pipeline
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Running pipeline:
 pipeline_info {
  id: "my_pipeline"
}
nodes {
  pipeline_node {
    node_info {
      type {
        name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
      }
      id: "CsvExampleGen"
    }
    contexts {
      contexts {
        type {
          name: "pipeline"
        }
        name {
          field_value {
            string_value: "my_pipeline"
          }
        }
      }
      contexts {
        type {
          name: "pipeline_run"
        }
        name {
          field_value {
            string_value: "2021-09-30T02:14:23.338846"
          }
        }
      }
      contexts {
        type {
          name: "node"
        }
        name {
          field_value {
            string_value: "my_pipeline.CsvExampleGen"
          }
        }
      }
    }
    outputs {
      outputs {
        key: "examples"
        value {
          artifact_spec {
            type {
              name: "Examples"
              properties {
                key: "span"
                value: INT
              }
              properties {
                key: "split_names"
                value: STRING
              }
              properties {
                key: "version"
                value: INT
              }
            }
          }
        }
      }
    }
    parameters {
      parameters {
        key: "input_base"
        value {
          field_value {
            string_value: "/home/kbuilder/imported/my_pipeline/data"
          }
        }
      }
      parameters {
        key: "input_config"
        value {
          field_value {
            string_value: "{\n  \"splits\": [\n    {\n      \"name\": \"single_split\",\n      \"pattern\": \"*\"\n    }\n  ]\n}"
          }
        }
      }
      parameters {
        key: "output_config"
        value {
          field_value {
            string_value: "{\n  \"split_config\": {\n    \"splits\": [\n      {\n        \"hash_buckets\": 2,\n        \"name\": \"train\"\n      },\n      {\n        \"hash_buckets\": 1,\n        \"name\": \"eval\"\n      }\n    ]\n  }\n}"
          }
        }
      }
      parameters {
        key: "output_data_format"
        value {
          field_value {
            int_value: 6
          }
        }
      }
      parameters {
        key: "output_file_format"
        value {
          field_value {
            int_value: 5
          }
        }
      }
    }
    downstream_nodes: "StatisticsGen"
    execution_options {
      caching_options {
      }
    }
  }
}
nodes {
  pipeline_node {
    node_info {
      type {
        name: "tfx.components.statistics_gen.component.StatisticsGen"
      }
      id: "StatisticsGen"
    }
    contexts {
      contexts {
        type {
          name: "pipeline"
        }
        name {
          field_value {
            string_value: "my_pipeline"
          }
        }
      }
      contexts {
        type {
          name: "pipeline_run"
        }
        name {
          field_value {
            string_value: "2021-09-30T02:14:23.338846"
          }
        }
      }
      contexts {
        type {
          name: "node"
        }
        name {
          field_value {
            string_value: "my_pipeline.StatisticsGen"
          }
        }
      }
    }
    inputs {
      inputs {
        key: "examples"
        value {
          channels {
            producer_node_query {
              id: "CsvExampleGen"
            }
            context_queries {
              type {
                name: "pipeline"
              }
              name {
                field_value {
                  string_value: "my_pipeline"
                }
              }
            }
            context_queries {
              type {
                name: "pipeline_run"
              }
              name {
                field_value {
                  string_value: "2021-09-30T02:14:23.338846"
                }
              }
            }
            context_queries {
              type {
                name: "node"
              }
              name {
                field_value {
                  string_value: "my_pipeline.CsvExampleGen"
                }
              }
            }
            artifact_query {
              type {
                name: "Examples"
              }
            }
            output_key: "examples"
          }
        }
      }
    }
    outputs {
      outputs {
        key: "statistics"
        value {
          artifact_spec {
            type {
              name: "ExampleStatistics"
              properties {
                key: "span"
                value: INT
              }
              properties {
                key: "split_names"
                value: STRING
              }
            }
          }
        }
      }
    }
    parameters {
      parameters {
        key: "exclude_splits"
        value {
          field_value {
            string_value: "[]"
          }
        }
      }
    }
    upstream_nodes: "CsvExampleGen"
    downstream_nodes: "ExampleValidator"
    downstream_nodes: "SchemaGen"
    execution_options {
      caching_options {
      }
    }
  }
}
nodes {
  pipeline_node {
    node_info {
      type {
        name: "tfx.components.schema_gen.component.SchemaGen"
      }
      id: "SchemaGen"
    }
    contexts {
      contexts {
        type {
          name: "pipeline"
        }
        name {
          field_value {
            string_value: "my_pipeline"
          }
        }
      }
      contexts {
        type {
          name: "pipeline_run"
        }
        name {
          field_value {
            string_value: "2021-09-30T02:14:23.338846"
          }
        }
      }
      contexts {
        type {
          name: "node"
        }
        name {
          field_value {
            string_value: "my_pipeline.SchemaGen"
          }
        }
      }
    }
    inputs {
      inputs {
        key: "statistics"
        value {
          channels {
            producer_node_query {
              id: "StatisticsGen"
            }
            context_queries {
              type {
                name: "pipeline"
              }
              name {
                field_value {
                  string_value: "my_pipeline"
                }
              }
            }
            context_queries {
              type {
                name: "pipeline_run"
              }
              name {
                field_value {
                  string_value: "2021-09-30T02:14:23.338846"
                }
              }
            }
            context_queries {
              type {
                name: "node"
              }
              name {
                field_value {
                  string_value: "my_pipeline.StatisticsGen"
                }
              }
            }
            artifact_query {
              type {
                name: "ExampleStatistics"
              }
            }
            output_key: "statistics"
          }
        }
      }
    }
    outputs {
      outputs {
        key: "schema"
        value {
          artifact_spec {
            type {
              name: "Schema"
            }
          }
        }
      }
    }
    parameters {
      parameters {
        key: "exclude_splits"
        value {
          field_value {
            string_value: "[]"
          }
        }
      }
      parameters {
        key: "infer_feature_shape"
        value {
          field_value {
            int_value: 1
          }
        }
      }
    }
    upstream_nodes: "StatisticsGen"
    downstream_nodes: "ExampleValidator"
    execution_options {
      caching_options {
      }
    }
  }
}
nodes {
  pipeline_node {
    node_info {
      type {
        name: "tfx.components.example_validator.component.ExampleValidator"
      }
      id: "ExampleValidator"
    }
    contexts {
      contexts {
        type {
          name: "pipeline"
        }
        name {
          field_value {
            string_value: "my_pipeline"
          }
        }
      }
      contexts {
        type {
          name: "pipeline_run"
        }
        name {
          field_value {
            string_value: "2021-09-30T02:14:23.338846"
          }
        }
      }
      contexts {
        type {
          name: "node"
        }
        name {
          field_value {
            string_value: "my_pipeline.ExampleValidator"
          }
        }
      }
    }
    inputs {
      inputs {
        key: "schema"
        value {
          channels {
            producer_node_query {
              id: "SchemaGen"
            }
            context_queries {
              type {
                name: "pipeline"
              }
              name {
                field_value {
                  string_value: "my_pipeline"
                }
              }
            }
            context_queries {
              type {
                name: "pipeline_run"
              }
              name {
                field_value {
                  string_value: "2021-09-30T02:14:23.338846"
                }
              }
            }
            context_queries {
              type {
                name: "node"
              }
              name {
                field_value {
                  string_value: "my_pipeline.SchemaGen"
                }
              }
            }
            artifact_query {
              type {
                name: "Schema"
              }
            }
            output_key: "schema"
          }
        }
      }
      inputs {
        key: "statistics"
        value {
          channels {
            producer_node_query {
              id: "StatisticsGen"
            }
            context_queries {
              type {
                name: "pipeline"
              }
              name {
                field_value {
                  string_value: "my_pipeline"
                }
              }
            }
            context_queries {
              type {
                name: "pipeline_run"
              }
              name {
                field_value {
                  string_value: "2021-09-30T02:14:23.338846"
                }
              }
            }
            context_queries {
              type {
                name: "node"
              }
              name {
                field_value {
                  string_value: "my_pipeline.StatisticsGen"
                }
              }
            }
            artifact_query {
              type {
                name: "ExampleStatistics"
              }
            }
            output_key: "statistics"
          }
        }
      }
    }
    outputs {
      outputs {
        key: "anomalies"
        value {
          artifact_spec {
            type {
              name: "ExampleAnomalies"
              properties {
                key: "span"
                value: INT
              }
              properties {
                key: "split_names"
                value: STRING
              }
            }
          }
        }
      }
    }
    parameters {
      parameters {
        key: "exclude_splits"
        value {
          field_value {
            string_value: "[]"
          }
        }
      }
    }
    upstream_nodes: "SchemaGen"
    upstream_nodes: "StatisticsGen"
    execution_options {
      caching_options {
      }
    }
  }
}
runtime_spec {
  pipeline_root {
    field_value {
      string_value: "./tfx_pipeline_output/my_pipeline"
    }
  }
  pipeline_run_id {
    field_value {
      string_value: "2021-09-30T02:14:23.338846"
    }
  }
}
execution_mode: SYNC
deployment_config {
  type_url: "type.googleapis.com/tfx.orchestration.IntermediateDeploymentConfig"
  value: "\n\220\001\n\rStatisticsGen\022\177\nHtype.googleapis.com/tfx.orchestration.executable_spec.BeamExecutableSpec\0223\n1\n/tfx.components.statistics_gen.executor.Executor\n\216\001\n\tSchemaGen\022\200\001\nOtype.googleapis.com/tfx.orchestration.executable_spec.PythonClassExecutableSpec\022-\n+tfx.components.schema_gen.executor.Executor\n\234\001\n\020ExampleValidator\022\207\001\nOtype.googleapis.com/tfx.orchestration.executable_spec.PythonClassExecutableSpec\0224\n2tfx.components.example_validator.executor.Executor\n\236\001\n\rCsvExampleGen\022\214\001\nHtype.googleapis.com/tfx.orchestration.executable_spec.BeamExecutableSpec\022@\n>\n<tfx.components.example_gen.csv_example_gen.executor.Executor\022\230\001\n\rCsvExampleGen\022\206\001\nOtype.googleapis.com/tfx.orchestration.executable_spec.PythonClassExecutableSpec\0223\n1tfx.components.example_gen.driver.FileBasedDriver*`\n0type.googleapis.com/ml_metadata.ConnectionConfig\022,\032*\n&./tfx_metadata/my_pipeline/metadata.db\020\003"
}

INFO:absl:Using deployment config:
 executor_specs {
  key: "CsvExampleGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor"
      }
    }
  }
}
executor_specs {
  key: "ExampleValidator"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.example_validator.executor.Executor"
    }
  }
}
executor_specs {
  key: "SchemaGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.schema_gen.executor.Executor"
    }
  }
}
executor_specs {
  key: "StatisticsGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.statistics_gen.executor.Executor"
      }
    }
  }
}
custom_driver_specs {
  key: "CsvExampleGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.example_gen.driver.FileBasedDriver"
    }
  }
}
metadata_connection_config {
  sqlite {
    filename_uri: "./tfx_metadata/my_pipeline/metadata.db"
    connection_mode: READWRITE_OPENCREATE
  }
}

INFO:absl:Using connection config:
 sqlite {
  filename_uri: "./tfx_metadata/my_pipeline/metadata.db"
  connection_mode: READWRITE_OPENCREATE
}

INFO:absl:Component CsvExampleGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
  }
  id: "CsvExampleGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2021-09-30T02:14:23.338846"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.CsvExampleGen"
      }
    }
  }
}
outputs {
  outputs {
    key: "examples"
    value {
      artifact_spec {
        type {
          name: "Examples"
          properties {
            key: "span"
            value: INT
          }
          properties {
            key: "split_names"
            value: STRING
          }
          properties {
            key: "version"
            value: INT
          }
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "input_base"
    value {
      field_value {
        string_value: "/home/kbuilder/imported/my_pipeline/data"
      }
    }
  }
  parameters {
    key: "input_config"
    value {
      field_value {
        string_value: "{\n  \"splits\": [\n    {\n      \"name\": \"single_split\",\n      \"pattern\": \"*\"\n    }\n  ]\n}"
      }
    }
  }
  parameters {
    key: "output_config"
    value {
      field_value {
        string_value: "{\n  \"split_config\": {\n    \"splits\": [\n      {\n        \"hash_buckets\": 2,\n        \"name\": \"train\"\n      },\n      {\n        \"hash_buckets\": 1,\n        \"name\": \"eval\"\n      }\n    ]\n  }\n}"
      }
    }
  }
  parameters {
    key: "output_data_format"
    value {
      field_value {
        int_value: 6
      }
    }
  }
  parameters {
    key: "output_file_format"
    value {
      field_value {
        int_value: 5
      }
    }
  }
}
downstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
  }
}

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:select span and version = (0, None)
INFO:absl:latest span and version = (0, None)
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 3
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=3, input_dict={}, output_dict=defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/3"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1632968027,sum_checksum:1632968027"
  }
}
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2021-09-30T02:14:23.338846:CsvExampleGen:examples:0"
  }
}
custom_properties {
  key: "span"
  value {
    int_value: 0
  }
}
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
properties {
  key: "version"
  value: INT
}
)]}), exec_properties={'output_config': '{\n  "split_config": {\n    "splits": [\n      {\n        "hash_buckets": 2,\n        "name": "train"\n      },\n      {\n        "hash_buckets": 1,\n        "name": "eval"\n      }\n    ]\n  }\n}', 'input_base': '/home/kbuilder/imported/my_pipeline/data', 'output_data_format': 6, 'input_config': '{\n  "splits": [\n    {\n      "name": "single_split",\n      "pattern": "*"\n    }\n  ]\n}', 'output_file_format': 5, 'span': 0, 'version': None, 'input_fingerprint': 'split:single_split,num_files:1,total_bytes:25648,xor_checksum:1632968027,sum_checksum:1632968027'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/3/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/stateful_working_dir/2021-09-30T02:14:23.338846', tmp_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/3/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
  }
  id: "CsvExampleGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2021-09-30T02:14:23.338846"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.CsvExampleGen"
      }
    }
  }
}
outputs {
  outputs {
    key: "examples"
    value {
      artifact_spec {
        type {
          name: "Examples"
          properties {
            key: "span"
            value: INT
          }
          properties {
            key: "split_names"
            value: STRING
          }
          properties {
            key: "version"
            value: INT
          }
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "input_base"
    value {
      field_value {
        string_value: "/home/kbuilder/imported/my_pipeline/data"
      }
    }
  }
  parameters {
    key: "input_config"
    value {
      field_value {
        string_value: "{\n  \"splits\": [\n    {\n      \"name\": \"single_split\",\n      \"pattern\": \"*\"\n    }\n  ]\n}"
      }
    }
  }
  parameters {
    key: "output_config"
    value {
      field_value {
        string_value: "{\n  \"split_config\": {\n    \"splits\": [\n      {\n        \"hash_buckets\": 2,\n        \"name\": \"train\"\n      },\n      {\n        \"hash_buckets\": 1,\n        \"name\": \"eval\"\n      }\n    ]\n  }\n}"
      }
    }
  }
  parameters {
    key: "output_data_format"
    value {
      field_value {
        int_value: 6
      }
    }
  }
  parameters {
    key: "output_file_format"
    value {
      field_value {
        int_value: 5
      }
    }
  }
}
downstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
  }
}
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2021-09-30T02:14:23.338846')
INFO:absl:Generating examples.
INFO:absl:Processing input csv data /home/kbuilder/imported/my_pipeline/data/* to TFExample.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be.
INFO:absl:Examples generated.
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 3 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/3"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1632968027,sum_checksum:1632968027"
  }
}
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2021-09-30T02:14:23.338846:CsvExampleGen:examples:0"
  }
}
custom_properties {
  key: "span"
  value {
    int_value: 0
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.2.0"
  }
}
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
properties {
  key: "version"
  value: INT
}
)]}) for execution 3
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component CsvExampleGen is finished.
INFO:absl:Component StatisticsGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.statistics_gen.component.StatisticsGen"
  }
  id: "StatisticsGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2021-09-30T02:14:23.338846"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.StatisticsGen"
      }
    }
  }
}
inputs {
  inputs {
    key: "examples"
    value {
      channels {
        producer_node_query {
          id: "CsvExampleGen"
        }
        context_queries {
          type {
            name: "pipeline"
          }
          name {
            field_value {
              string_value: "my_pipeline"
            }
          }
        }
        context_queries {
          type {
            name: "pipeline_run"
          }
          name {
            field_value {
              string_value: "2021-09-30T02:14:23.338846"
            }
          }
        }
        context_queries {
          type {
            name: "node"
          }
          name {
            field_value {
              string_value: "my_pipeline.CsvExampleGen"
            }
          }
        }
        artifact_query {
          type {
            name: "Examples"
          }
        }
        output_key: "examples"
      }
    }
  }
}
outputs {
  outputs {
    key: "statistics"
    value {
      artifact_spec {
        type {
          name: "ExampleStatistics"
          properties {
            key: "span"
            value: INT
          }
          properties {
            key: "split_names"
            value: STRING
          }
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
      }
    }
  }
}
upstream_nodes: "CsvExampleGen"
downstream_nodes: "ExampleValidator"
downstream_nodes: "SchemaGen"
execution_options {
  caching_options {
  }
}

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 4
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=4, input_dict={'examples': [Artifact(artifact: id: 2
type_id: 15
uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/3"
properties {
  key: "split_names"
  value {
    string_value: "[\"train\", \"eval\"]"
  }
}
custom_properties {
  key: "file_format"
  value {
    string_value: "tfrecords_gzip"
  }
}
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1632968027,sum_checksum:1632968027"
  }
}
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2021-09-30T02:14:23.338846:CsvExampleGen:examples:0"
  }
}
custom_properties {
  key: "payload_format"
  value {
    string_value: "FORMAT_TF_EXAMPLE"
  }
}
custom_properties {
  key: "span"
  value {
    int_value: 0
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.2.0"
  }
}
state: LIVE
create_time_since_epoch: 1632968064331
last_update_time_since_epoch: 1632968064331
, artifact_type: id: 15
name: "Examples"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
properties {
  key: "version"
  value: INT
}
)]}, output_dict=defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/4"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2021-09-30T02:14:23.338846:StatisticsGen:statistics:0"
  }
}
, artifact_type: name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
)]}), exec_properties={'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/4/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/stateful_working_dir/2021-09-30T02:14:23.338846', tmp_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/4/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.statistics_gen.component.StatisticsGen"
  }
  id: "StatisticsGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2021-09-30T02:14:23.338846"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.StatisticsGen"
      }
    }
  }
}
inputs {
  inputs {
    key: "examples"
    value {
      channels {
        producer_node_query {
          id: "CsvExampleGen"
        }
        context_queries {
          type {
            name: "pipeline"
          }
          name {
            field_value {
              string_value: "my_pipeline"
            }
          }
        }
        context_queries {
          type {
            name: "pipeline_run"
          }
          name {
            field_value {
              string_value: "2021-09-30T02:14:23.338846"
            }
          }
        }
        context_queries {
          type {
            name: "node"
          }
          name {
            field_value {
              string_value: "my_pipeline.CsvExampleGen"
            }
          }
        }
        artifact_query {
          type {
            name: "Examples"
          }
        }
        output_key: "examples"
      }
    }
  }
}
outputs {
  outputs {
    key: "statistics"
    value {
      artifact_spec {
        type {
          name: "ExampleStatistics"
          properties {
            key: "span"
            value: INT
          }
          properties {
            key: "split_names"
            value: STRING
          }
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
      }
    }
  }
}
upstream_nodes: "CsvExampleGen"
downstream_nodes: "ExampleValidator"
downstream_nodes: "SchemaGen"
execution_options {
  caching_options {
  }
}
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2021-09-30T02:14:23.338846')
INFO:absl:Generating statistics for split train.
INFO:absl:Statistics for split train written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/4/Split-train.
INFO:absl:Generating statistics for split eval.
INFO:absl:Statistics for split eval written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/4/Split-eval.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
INFO:absl:MetadataStore with DB connection initialized
ERROR:absl:Execution 4 failed.
INFO:absl:Cleaning up stateless execution info.
Traceback (most recent call last):
  File "local_runner.py", line 78, in <module>
    run()
  File "local_runner.py", line 73, in run
    .sqlite_metadata_connection_config(METADATA_PATH)))
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tfx/orchestration/local/local_dag_runner.py", line 91, in run
    component_launcher.launch()
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tfx/orchestration/portable/launcher.py", line 482, in launch
    executor_output = self._run_executor(execution_info)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tfx/orchestration/portable/launcher.py", line 358, in _run_executor
    executor_output = self._executor_operator.run_executor(execution_info)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tfx/orchestration/portable/beam_executor_operator.py", line 89, in run_executor
    return python_executor_operator.run_with_executor(execution_info, executor)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tfx/orchestration/portable/python_executor_operator.py", line 55, in run_with_executor
    execution_info.exec_properties)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tfx/components/statistics_gen/executor.py", line 142, in Do
    output_uri)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/apache_beam/pipeline.py", line 585, in __exit__
    self.result = self.run()
  File "/home/kbuilder/.local/lib/python3.7/site-packages/apache_beam/pipeline.py", line 564, in run
    return self.runner.run_pipeline(self, self._options)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py", line 131, in run_pipeline
    return runner.run_pipeline(pipeline, options)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 196, in run_pipeline
    pipeline.to_runner_api(default_environment=self._default_environment))
  File "/home/kbuilder/.local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 206, in run_via_runner_api
    return self.run_stages(stage_context, stages)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 385, in run_stages
    runner_execution_context, bundle_context_manager)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 653, in _run_stage
    bundle_manager))
  File "/home/kbuilder/.local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 770, in _run_bundle
    data_input, data_output, input_timers, expected_timer_output)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1080, in process_bundle
    result_future = self._worker_handler.control_conn.push(process_bundle_req)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", line 378, in push
    response = self.worker.do_instruction(request)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 602, in do_instruction
    getattr(request, request_type), request.instruction_id)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 639, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "/home/kbuilder/.local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1002, in process_bundle
    op.finish()
  File "apache_beam/runners/worker/operations.py", line 1071, in apache_beam.runners.worker.operations.PGBKCVOperation.finish
  File "apache_beam/runners/worker/operations.py", line 1074, in apache_beam.runners.worker.operations.PGBKCVOperation.finish
  File "apache_beam/runners/worker/operations.py", line 1089, in apache_beam.runners.worker.operations.PGBKCVOperation.output_key
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tensorflow_data_validation/statistics/stats_impl.py", line 757, in compact
    self._maybe_do_batch(accumulator, force=True)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tensorflow_data_validation/statistics/stats_impl.py", line 690, in _maybe_do_batch
    accumulator.partial_accumulators)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tensorflow_data_validation/statistics/stats_impl.py", line 649, in _for_each_generator
    self._generators, zip(*args))]
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tensorflow_data_validation/statistics/stats_impl.py", line 648, in <listcomp>
    return [func(gen, *args_for_func) for gen, args_for_func in zip(
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tensorflow_data_validation/statistics/stats_impl.py", line 689, in <lambda>
    lambda gen, gen_acc: gen.add_input(gen_acc, record_batch),
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tensorflow_data_validation/statistics/generators/basic_stats_generator.py", line 1026, in add_input
    weights)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tensorflow_data_validation/statistics/generators/basic_stats_generator.py", line 232, in update
    weights)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tensorflow_data_validation/statistics/generators/basic_stats_generator.py", line 128, in update
    num_values_grouped = pa.array(num_values_not_none).value_counts()
  File "pyarrow/array.pxi", line 292, in pyarrow.lib.array
  File "pyarrow/array.pxi", line 79, in pyarrow.lib._ndarray_to_array
  File "pyarrow/array.pxi", line 67, in pyarrow.lib._ndarray_to_type
  File "pyarrow/error.pxi", line 107, in pyarrow.lib.check_status
pyarrow.lib.ArrowTypeError: Did not pass numpy.dtype object
Error while running "/usr/bin/python3.7 local_runner.py"

Si le pipeline s'est exécuté avec succès, vous devriez voir « La transformation de composant est terminée ». quelque part dans le journal. Du fait Transform composant et ExampleValidator composant ne dépendent pas les uns des autres, l'ordre d'exécution est pas fixé. Cela dit, que ce soit de Transform et ExampleValidator peut être le dernier composant dans l'exécution du pipeline.

Examiner la sortie de Transform

Le composant Transform crée deux types de sorties, un graphique Tensorflow et des exemples transformés. Les exemples transformés sont du type d'artefact Exemples qui est également produit par ExampleGen, mais celui-ci contient à la place des valeurs de caractéristiques transformées.

Vous pouvez les examiner comme nous l'avons fait à l'étape précédente.

with metadata.Metadata(metadata_connection_config) as metadata_handler:
    # Search all aritfacts from the previous run of Transform component.
    artifacts = get_latest_artifacts(metadata_handler.store,
                                     PIPELINE_NAME, "Transform")
    # Find artifacts of Examples type.
    transformed_examples_artifacts = find_latest_artifacts_by_type(
        metadata_handler.store, artifacts,
        standard_artifacts.Examples.TYPE_NAME)
preview_examples(transformed_examples_artifacts)

Étape 4. Entraînez votre modèle avec le composant Trainer.

Nous allons construire un modèle ML utilisant Trainer composant. Voir Guide élément formateur pour plus d' informations. Vous devez fournir votre code de modèle au composant Trainer.

Définissez votre modèle.

Dans le modèle de pingouin, models.model.run_fn est utilisé comme run_fn argument en faveur de Trainer composant. Cela signifie que run_fn() fonction dans les models/model.py sera appelée lorsque Trainer composant fonctionne. Vous pouvez voir le code pour construire un modèle simple DNN utilisant keras API dans le code donné. Voir tensorflow 2.x dans TFX guide pour plus d' informations sur l' utilisation de l' API dans keras TFX.

Dans ce run_fn , vous devez construire un modèle et l' enregistrer dans un répertoire pointé par fn_args.serving_model_dir qui est spécifié par le composant. Vous pouvez utiliser d' autres arguments dans fn_args qui est passé dans le run_fn . Voir les codes correspondants pour la liste complète des arguments fn_args .

Définissez vos fonctions dans les models/features.py et de les utiliser au besoin. Si vous avez transformé vos entités à l'étape 3, vous devez utiliser des entités transformées comme entrées de votre modèle.

Ajoutez le composant Trainer au pipeline.

Si votre run_fn est prêt, ajoutez Trainer composant au pipeline.

  1. Dans pipeline/pipeline.py fichier, décommentez # components.append(trainer) pour ajouter le composant à la canalisation.

Les arguments pour le composant d'entraînement peuvent dépendre de l'utilisation ou non du composant Transform.

  • Si vous ne l' utilisez pas Transform composant, vous n'avez pas besoin de changer les arguments.
  • Si vous utilisez Transform des composants, vous devez changer des arguments lors de la création d' un Trainer instance de composant.

    • Modifier examples argument examples=transform.outputs['transformed_examples'], . Nous devons utiliser des exemples transformés pour la formation.
    • Ajouter transform_graph l' argument comme transform_graph=transform.outputs['transform_graph'], . Ce graphique contient le graphique TensorFlow pour les opérations de transformation.
    • Après les modifications ci-dessus, le code pour la création du composant Trainer ressemblera à ce qui suit.
    # If you use a Transform component.
    trainer = Trainer(
        run_fn=run_fn,
        examples=transform.outputs['transformed_examples'],
        transform_graph=transform.outputs['transform_graph'],
        schema=schema_gen.outputs['schema'],
        ...
    

Vous pouvez mettre à jour le pipeline et l'exécuter à nouveau.

!tfx pipeline update --engine=local --pipeline_path=local_runner.py \
 && tfx run create --engine=local --pipeline_name={PIPELINE_NAME}
CLI
Updating pipeline
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
Pipeline "my_pipeline" updated successfully.
CLI
Creating a run for pipeline: my_pipeline
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Running pipeline:
 pipeline_info {
  id: "my_pipeline"
}
nodes {
  pipeline_node {
    node_info {
      type {
        name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
      }
      id: "CsvExampleGen"
    }
    contexts {
      contexts {
        type {
          name: "pipeline"
        }
        name {
          field_value {
            string_value: "my_pipeline"
          }
        }
      }
      contexts {
        type {
          name: "pipeline_run"
        }
        name {
          field_value {
            string_value: "2021-09-30T02:14:38.532021"
          }
        }
      }
      contexts {
        type {
          name: "node"
        }
        name {
          field_value {
            string_value: "my_pipeline.CsvExampleGen"
          }
        }
      }
    }
    outputs {
      outputs {
        key: "examples"
        value {
          artifact_spec {
            type {
              name: "Examples"
              properties {
                key: "span"
                value: INT
              }
              properties {
                key: "split_names"
                value: STRING
              }
              properties {
                key: "version"
                value: INT
              }
            }
          }
        }
      }
    }
    parameters {
      parameters {
        key: "input_base"
        value {
          field_value {
            string_value: "/home/kbuilder/imported/my_pipeline/data"
          }
        }
      }
      parameters {
        key: "input_config"
        value {
          field_value {
            string_value: "{\n  \"splits\": [\n    {\n      \"name\": \"single_split\",\n      \"pattern\": \"*\"\n    }\n  ]\n}"
          }
        }
      }
      parameters {
        key: "output_config"
        value {
          field_value {
            string_value: "{\n  \"split_config\": {\n    \"splits\": [\n      {\n        \"hash_buckets\": 2,\n        \"name\": \"train\"\n      },\n      {\n        \"hash_buckets\": 1,\n        \"name\": \"eval\"\n      }\n    ]\n  }\n}"
          }
        }
      }
      parameters {
        key: "output_data_format"
        value {
          field_value {
            int_value: 6
          }
        }
      }
      parameters {
        key: "output_file_format"
        value {
          field_value {
            int_value: 5
          }
        }
      }
    }
    downstream_nodes: "StatisticsGen"
    execution_options {
      caching_options {
      }
    }
  }
}
nodes {
  pipeline_node {
    node_info {
      type {
        name: "tfx.components.statistics_gen.component.StatisticsGen"
      }
      id: "StatisticsGen"
    }
    contexts {
      contexts {
        type {
          name: "pipeline"
        }
        name {
          field_value {
            string_value: "my_pipeline"
          }
        }
      }
      contexts {
        type {
          name: "pipeline_run"
        }
        name {
          field_value {
            string_value: "2021-09-30T02:14:38.532021"
          }
        }
      }
      contexts {
        type {
          name: "node"
        }
        name {
          field_value {
            string_value: "my_pipeline.StatisticsGen"
          }
        }
      }
    }
    inputs {
      inputs {
        key: "examples"
        value {
          channels {
            producer_node_query {
              id: "CsvExampleGen"
            }
            context_queries {
              type {
                name: "pipeline"
              }
              name {
                field_value {
                  string_value: "my_pipeline"
                }
              }
            }
            context_queries {
              type {
                name: "pipeline_run"
              }
              name {
                field_value {
                  string_value: "2021-09-30T02:14:38.532021"
                }
              }
            }
            context_queries {
              type {
                name: "node"
              }
              name {
                field_value {
                  string_value: "my_pipeline.CsvExampleGen"
                }
              }
            }
            artifact_query {
              type {
                name: "Examples"
              }
            }
            output_key: "examples"
          }
        }
      }
    }
    outputs {
      outputs {
        key: "statistics"
        value {
          artifact_spec {
            type {
              name: "ExampleStatistics"
              properties {
                key: "span"
                value: INT
              }
              properties {
                key: "split_names"
                value: STRING
              }
            }
          }
        }
      }
    }
    parameters {
      parameters {
        key: "exclude_splits"
        value {
          field_value {
            string_value: "[]"
          }
        }
      }
    }
    upstream_nodes: "CsvExampleGen"
    downstream_nodes: "ExampleValidator"
    downstream_nodes: "SchemaGen"
    execution_options {
      caching_options {
      }
    }
  }
}
nodes {
  pipeline_node {
    node_info {
      type {
        name: "tfx.components.schema_gen.component.SchemaGen"
      }
      id: "SchemaGen"
    }
    contexts {
      contexts {
        type {
          name: "pipeline"
        }
        name {
          field_value {
            string_value: "my_pipeline"
          }
        }
      }
      contexts {
        type {
          name: "pipeline_run"
        }
        name {
          field_value {
            string_value: "2021-09-30T02:14:38.532021"
          }
        }
      }
      contexts {
        type {
          name: "node"
        }
        name {
          field_value {
            string_value: "my_pipeline.SchemaGen"
          }
        }
      }
    }
    inputs {
      inputs {
        key: "statistics"
        value {
          channels {
            producer_node_query {
              id: "StatisticsGen"
            }
            context_queries {
              type {
                name: "pipeline"
              }
              name {
                field_value {
                  string_value: "my_pipeline"
                }
              }
            }
            context_queries {
              type {
                name: "pipeline_run"
              }
              name {
                field_value {
                  string_value: "2021-09-30T02:14:38.532021"
                }
              }
            }
            context_queries {
              type {
                name: "node"
              }
              name {
                field_value {
                  string_value: "my_pipeline.StatisticsGen"
                }
              }
            }
            artifact_query {
              type {
                name: "ExampleStatistics"
              }
            }
            output_key: "statistics"
          }
        }
      }
    }
    outputs {
      outputs {
        key: "schema"
        value {
          artifact_spec {
            type {
              name: "Schema"
            }
          }
        }
      }
    }
    parameters {
      parameters {
        key: "exclude_splits"
        value {
          field_value {
            string_value: "[]"
          }
        }
      }
      parameters {
        key: "infer_feature_shape"
        value {
          field_value {
            int_value: 1
          }
        }
      }
    }
    upstream_nodes: "StatisticsGen"
    downstream_nodes: "ExampleValidator"
    execution_options {
      caching_options {
      }
    }
  }
}
nodes {
  pipeline_node {
    node_info {
      type {
        name: "tfx.components.example_validator.component.ExampleValidator"
      }
      id: "ExampleValidator"
    }
    contexts {
      contexts {
        type {
          name: "pipeline"
        }
        name {
          field_value {
            string_value: "my_pipeline"
          }
        }
      }
      contexts {
        type {
          name: "pipeline_run"
        }
        name {
          field_value {
            string_value: "2021-09-30T02:14:38.532021"
          }
        }
      }
      contexts {
        type {
          name: "node"
        }
        name {
          field_value {
            string_value: "my_pipeline.ExampleValidator"
          }
        }
      }
    }
    inputs {
      inputs {
        key: "schema"
        value {
          channels {
            producer_node_query {
              id: "SchemaGen"
            }
            context_queries {
              type {
                name: "pipeline"
              }
              name {
                field_value {
                  string_value: "my_pipeline"
                }
              }
            }
            context_queries {
              type {
                name: "pipeline_run"
              }
              name {
                field_value {
                  string_value: "2021-09-30T02:14:38.532021"
                }
              }
            }
            context_queries {
              type {
                name: "node"
              }
              name {
                field_value {
                  string_value: "my_pipeline.SchemaGen"
                }
              }
            }
            artifact_query {
              type {
                name: "Schema"
              }
            }
            output_key: "schema"
          }
        }
      }
      inputs {
        key: "statistics"
        value {
          channels {
            producer_node_query {
              id: "StatisticsGen"
            }
            context_queries {
              type {
                name: "pipeline"
              }
              name {
                field_value {
                  string_value: "my_pipeline"
                }
              }
            }
            context_queries {
              type {
                name: "pipeline_run"
              }
              name {
                field_value {
                  string_value: "2021-09-30T02:14:38.532021"
                }
              }
            }
            context_queries {
              type {
                name: "node"
              }
              name {
                field_value {
                  string_value: "my_pipeline.StatisticsGen"
                }
              }
            }
            artifact_query {
              type {
                name: "ExampleStatistics"
              }
            }
            output_key: "statistics"
          }
        }
      }
    }
    outputs {
      outputs {
        key: "anomalies"
        value {
          artifact_spec {
            type {
              name: "ExampleAnomalies"
              properties {
                key: "span"
                value: INT
              }
              properties {
                key: "split_names"
                value: STRING
              }
            }
          }
        }
      }
    }
    parameters {
      parameters {
        key: "exclude_splits"
        value {
          field_value {
            string_value: "[]"
          }
        }
      }
    }
    upstream_nodes: "SchemaGen"
    upstream_nodes: "StatisticsGen"
    execution_options {
      caching_options {
      }
    }
  }
}
runtime_spec {
  pipeline_root {
    field_value {
      string_value: "./tfx_pipeline_output/my_pipeline"
    }
  }
  pipeline_run_id {
    field_value {
      string_value: "2021-09-30T02:14:38.532021"
    }
  }
}
execution_mode: SYNC
deployment_config {
  type_url: "type.googleapis.com/tfx.orchestration.IntermediateDeploymentConfig"
  value: "\n\234\001\n\020ExampleValidator\022\207\001\nOtype.googleapis.com/tfx.orchestration.executable_spec.PythonClassExecutableSpec\0224\n2tfx.components.example_validator.executor.Executor\n\216\001\n\tSchemaGen\022\200\001\nOtype.googleapis.com/tfx.orchestration.executable_spec.PythonClassExecutableSpec\022-\n+tfx.components.schema_gen.executor.Executor\n\220\001\n\rStatisticsGen\022\177\nHtype.googleapis.com/tfx.orchestration.executable_spec.BeamExecutableSpec\0223\n1\n/tfx.components.statistics_gen.executor.Executor\n\236\001\n\rCsvExampleGen\022\214\001\nHtype.googleapis.com/tfx.orchestration.executable_spec.BeamExecutableSpec\022@\n>\n<tfx.components.example_gen.csv_example_gen.executor.Executor\022\230\001\n\rCsvExampleGen\022\206\001\nOtype.googleapis.com/tfx.orchestration.executable_spec.PythonClassExecutableSpec\0223\n1tfx.components.example_gen.driver.FileBasedDriver*`\n0type.googleapis.com/ml_metadata.ConnectionConfig\022,\032*\n&./tfx_metadata/my_pipeline/metadata.db\020\003"
}

INFO:absl:Using deployment config:
 executor_specs {
  key: "CsvExampleGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor"
      }
    }
  }
}
executor_specs {
  key: "ExampleValidator"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.example_validator.executor.Executor"
    }
  }
}
executor_specs {
  key: "SchemaGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.schema_gen.executor.Executor"
    }
  }
}
executor_specs {
  key: "StatisticsGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.statistics_gen.executor.Executor"
      }
    }
  }
}
custom_driver_specs {
  key: "CsvExampleGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.example_gen.driver.FileBasedDriver"
    }
  }
}
metadata_connection_config {
  sqlite {
    filename_uri: "./tfx_metadata/my_pipeline/metadata.db"
    connection_mode: READWRITE_OPENCREATE
  }
}

INFO:absl:Using connection config:
 sqlite {
  filename_uri: "./tfx_metadata/my_pipeline/metadata.db"
  connection_mode: READWRITE_OPENCREATE
}

INFO:absl:Component CsvExampleGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
  }
  id: "CsvExampleGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2021-09-30T02:14:38.532021"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.CsvExampleGen"
      }
    }
  }
}
outputs {
  outputs {
    key: "examples"
    value {
      artifact_spec {
        type {
          name: "Examples"
          properties {
            key: "span"
            value: INT
          }
          properties {
            key: "split_names"
            value: STRING
          }
          properties {
            key: "version"
            value: INT
          }
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "input_base"
    value {
      field_value {
        string_value: "/home/kbuilder/imported/my_pipeline/data"
      }
    }
  }
  parameters {
    key: "input_config"
    value {
      field_value {
        string_value: "{\n  \"splits\": [\n    {\n      \"name\": \"single_split\",\n      \"pattern\": \"*\"\n    }\n  ]\n}"
      }
    }
  }
  parameters {
    key: "output_config"
    value {
      field_value {
        string_value: "{\n  \"split_config\": {\n    \"splits\": [\n      {\n        \"hash_buckets\": 2,\n        \"name\": \"train\"\n      },\n      {\n        \"hash_buckets\": 1,\n        \"name\": \"eval\"\n      }\n    ]\n  }\n}"
      }
    }
  }
  parameters {
    key: "output_data_format"
    value {
      field_value {
        int_value: 6
      }
    }
  }
  parameters {
    key: "output_file_format"
    value {
      field_value {
        int_value: 5
      }
    }
  }
}
downstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
  }
}

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:select span and version = (0, None)
INFO:absl:latest span and version = (0, None)
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 5
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=5, input_dict={}, output_dict=defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/5"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1632968027,sum_checksum:1632968027"
  }
}
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2021-09-30T02:14:38.532021:CsvExampleGen:examples:0"
  }
}
custom_properties {
  key: "span"
  value {
    int_value: 0
  }
}
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
properties {
  key: "version"
  value: INT
}
)]}), exec_properties={'output_config': '{\n  "split_config": {\n    "splits": [\n      {\n        "hash_buckets": 2,\n        "name": "train"\n      },\n      {\n        "hash_buckets": 1,\n        "name": "eval"\n      }\n    ]\n  }\n}', 'input_base': '/home/kbuilder/imported/my_pipeline/data', 'output_file_format': 5, 'input_config': '{\n  "splits": [\n    {\n      "name": "single_split",\n      "pattern": "*"\n    }\n  ]\n}', 'output_data_format': 6, 'span': 0, 'version': None, 'input_fingerprint': 'split:single_split,num_files:1,total_bytes:25648,xor_checksum:1632968027,sum_checksum:1632968027'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/5/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/stateful_working_dir/2021-09-30T02:14:38.532021', tmp_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/5/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
  }
  id: "CsvExampleGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2021-09-30T02:14:38.532021"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.CsvExampleGen"
      }
    }
  }
}
outputs {
  outputs {
    key: "examples"
    value {
      artifact_spec {
        type {
          name: "Examples"
          properties {
            key: "span"
            value: INT
          }
          properties {
            key: "split_names"
            value: STRING
          }
          properties {
            key: "version"
            value: INT
          }
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "input_base"
    value {
      field_value {
        string_value: "/home/kbuilder/imported/my_pipeline/data"
      }
    }
  }
  parameters {
    key: "input_config"
    value {
      field_value {
        string_value: "{\n  \"splits\": [\n    {\n      \"name\": \"single_split\",\n      \"pattern\": \"*\"\n    }\n  ]\n}"
      }
    }
  }
  parameters {
    key: "output_config"
    value {
      field_value {
        string_value: "{\n  \"split_config\": {\n    \"splits\": [\n      {\n        \"hash_buckets\": 2,\n        \"name\": \"train\"\n      },\n      {\n        \"hash_buckets\": 1,\n        \"name\": \"eval\"\n      }\n    ]\n  }\n}"
      }
    }
  }
  parameters {
    key: "output_data_format"
    value {
      field_value {
        int_value: 6
      }
    }
  }
  parameters {
    key: "output_file_format"
    value {
      field_value {
        int_value: 5
      }
    }
  }
}
downstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
  }
}
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2021-09-30T02:14:38.532021')
INFO:absl:Generating examples.
INFO:absl:Processing input csv data /home/kbuilder/imported/my_pipeline/data/* to TFExample.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be.
INFO:absl:Examples generated.
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 5 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/5"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1632968027,sum_checksum:1632968027"
  }
}
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2021-09-30T02:14:38.532021:CsvExampleGen:examples:0"
  }
}
custom_properties {
  key: "span"
  value {
    int_value: 0
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.2.0"
  }
}
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
properties {
  key: "version"
  value: INT
}
)]}) for execution 5
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component CsvExampleGen is finished.
INFO:absl:Component StatisticsGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.statistics_gen.component.StatisticsGen"
  }
  id: "StatisticsGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2021-09-30T02:14:38.532021"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.StatisticsGen"
      }
    }
  }
}
inputs {
  inputs {
    key: "examples"
    value {
      channels {
        producer_node_query {
          id: "CsvExampleGen"
        }
        context_queries {
          type {
            name: "pipeline"
          }
          name {
            field_value {
              string_value: "my_pipeline"
            }
          }
        }
        context_queries {
          type {
            name: "pipeline_run"
          }
          name {
            field_value {
              string_value: "2021-09-30T02:14:38.532021"
            }
          }
        }
        context_queries {
          type {
            name: "node"
          }
          name {
            field_value {
              string_value: "my_pipeline.CsvExampleGen"
            }
          }
        }
        artifact_query {
          type {
            name: "Examples"
          }
        }
        output_key: "examples"
      }
    }
  }
}
outputs {
  outputs {
    key: "statistics"
    value {
      artifact_spec {
        type {
          name: "ExampleStatistics"
          properties {
            key: "span"
            value: INT
          }
          properties {
            key: "split_names"
            value: STRING
          }
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
      }
    }
  }
}
upstream_nodes: "CsvExampleGen"
downstream_nodes: "ExampleValidator"
downstream_nodes: "SchemaGen"
execution_options {
  caching_options {
  }
}

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 6
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=6, input_dict={'examples': [Artifact(artifact: id: 3
type_id: 15
uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/5"
properties {
  key: "split_names"
  value {
    string_value: "[\"train\", \"eval\"]"
  }
}
custom_properties {
  key: "file_format"
  value {
    string_value: "tfrecords_gzip"
  }
}
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1632968027,sum_checksum:1632968027"
  }
}
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2021-09-30T02:14:38.532021:CsvExampleGen:examples:0"
  }
}
custom_properties {
  key: "payload_format"
  value {
    string_value: "FORMAT_TF_EXAMPLE"
  }
}
custom_properties {
  key: "span"
  value {
    int_value: 0
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.2.0"
  }
}
state: LIVE
create_time_since_epoch: 1632968079531
last_update_time_since_epoch: 1632968079531
, artifact_type: id: 15
name: "Examples"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
properties {
  key: "version"
  value: INT
}
)]}, output_dict=defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/6"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2021-09-30T02:14:38.532021:StatisticsGen:statistics:0"
  }
}
, artifact_type: name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
)]}), exec_properties={'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/6/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/stateful_working_dir/2021-09-30T02:14:38.532021', tmp_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/6/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.statistics_gen.component.StatisticsGen"
  }
  id: "StatisticsGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2021-09-30T02:14:38.532021"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.StatisticsGen"
      }
    }
  }
}
inputs {
  inputs {
    key: "examples"
    value {
      channels {
        producer_node_query {
          id: "CsvExampleGen"
        }
        context_queries {
          type {
            name: "pipeline"
          }
          name {
            field_value {
              string_value: "my_pipeline"
            }
          }
        }
        context_queries {
          type {
            name: "pipeline_run"
          }
          name {
            field_value {
              string_value: "2021-09-30T02:14:38.532021"
            }
          }
        }
        context_queries {
          type {
            name: "node"
          }
          name {
            field_value {
              string_value: "my_pipeline.CsvExampleGen"
            }
          }
        }
        artifact_query {
          type {
            name: "Examples"
          }
        }
        output_key: "examples"
      }
    }
  }
}
outputs {
  outputs {
    key: "statistics"
    value {
      artifact_spec {
        type {
          name: "ExampleStatistics"
          properties {
            key: "span"
            value: INT
          }
          properties {
            key: "split_names"
            value: STRING
          }
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
      }
    }
  }
}
upstream_nodes: "CsvExampleGen"
downstream_nodes: "ExampleValidator"
downstream_nodes: "SchemaGen"
execution_options {
  caching_options {
  }
}
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2021-09-30T02:14:38.532021')
INFO:absl:Generating statistics for split train.
INFO:absl:Statistics for split train written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/6/Split-train.
INFO:absl:Generating statistics for split eval.
INFO:absl:Statistics for split eval written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/6/Split-eval.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
INFO:absl:MetadataStore with DB connection initialized
ERROR:absl:Execution 6 failed.
INFO:absl:Cleaning up stateless execution info.
Traceback (most recent call last):
  File "local_runner.py", line 78, in <module>
    run()
  File "local_runner.py", line 73, in run
    .sqlite_metadata_connection_config(METADATA_PATH)))
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tfx/orchestration/local/local_dag_runner.py", line 91, in run
    component_launcher.launch()
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tfx/orchestration/portable/launcher.py", line 482, in launch
    executor_output = self._run_executor(execution_info)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tfx/orchestration/portable/launcher.py", line 358, in _run_executor
    executor_output = self._executor_operator.run_executor(execution_info)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tfx/orchestration/portable/beam_executor_operator.py", line 89, in run_executor
    return python_executor_operator.run_with_executor(execution_info, executor)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tfx/orchestration/portable/python_executor_operator.py", line 55, in run_with_executor
    execution_info.exec_properties)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tfx/components/statistics_gen/executor.py", line 142, in Do
    output_uri)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/apache_beam/pipeline.py", line 585, in __exit__
    self.result = self.run()
  File "/home/kbuilder/.local/lib/python3.7/site-packages/apache_beam/pipeline.py", line 564, in run
    return self.runner.run_pipeline(self, self._options)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py", line 131, in run_pipeline
    return runner.run_pipeline(pipeline, options)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 196, in run_pipeline
    pipeline.to_runner_api(default_environment=self._default_environment))
  File "/home/kbuilder/.local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 206, in run_via_runner_api
    return self.run_stages(stage_context, stages)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 385, in run_stages
    runner_execution_context, bundle_context_manager)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 653, in _run_stage
    bundle_manager))
  File "/home/kbuilder/.local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 770, in _run_bundle
    data_input, data_output, input_timers, expected_timer_output)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1080, in process_bundle
    result_future = self._worker_handler.control_conn.push(process_bundle_req)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", line 378, in push
    response = self.worker.do_instruction(request)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 602, in do_instruction
    getattr(request, request_type), request.instruction_id)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 639, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "/home/kbuilder/.local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1002, in process_bundle
    op.finish()
  File "apache_beam/runners/worker/operations.py", line 1071, in apache_beam.runners.worker.operations.PGBKCVOperation.finish
  File "apache_beam/runners/worker/operations.py", line 1074, in apache_beam.runners.worker.operations.PGBKCVOperation.finish
  File "apache_beam/runners/worker/operations.py", line 1089, in apache_beam.runners.worker.operations.PGBKCVOperation.output_key
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tensorflow_data_validation/statistics/stats_impl.py", line 757, in compact
    self._maybe_do_batch(accumulator, force=True)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tensorflow_data_validation/statistics/stats_impl.py", line 690, in _maybe_do_batch
    accumulator.partial_accumulators)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tensorflow_data_validation/statistics/stats_impl.py", line 649, in _for_each_generator
    self._generators, zip(*args))]
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tensorflow_data_validation/statistics/stats_impl.py", line 648, in <listcomp>
    return [func(gen, *args_for_func) for gen, args_for_func in zip(
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tensorflow_data_validation/statistics/stats_impl.py", line 689, in <lambda>
    lambda gen, gen_acc: gen.add_input(gen_acc, record_batch),
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tensorflow_data_validation/statistics/generators/basic_stats_generator.py", line 1026, in add_input
    weights)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tensorflow_data_validation/statistics/generators/basic_stats_generator.py", line 232, in update
    weights)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tensorflow_data_validation/statistics/generators/basic_stats_generator.py", line 128, in update
    num_values_grouped = pa.array(num_values_not_none).value_counts()
  File "pyarrow/array.pxi", line 292, in pyarrow.lib.array
  File "pyarrow/array.pxi", line 79, in pyarrow.lib._ndarray_to_array
  File "pyarrow/array.pxi", line 67, in pyarrow.lib._ndarray_to_type
  File "pyarrow/error.pxi", line 107, in pyarrow.lib.check_status
pyarrow.lib.ArrowTypeError: Did not pass numpy.dtype object
Error while running "/usr/bin/python3.7 local_runner.py"

Lorsque cette exécution s'exécute avec succès, vous avez maintenant créé et exécuté votre premier pipeline TFX pour votre modèle. Toutes nos félicitations!

Votre nouveau modèle sera situé à un endroit sous le répertoire de sortie, mais il serait préférable d'avoir un modèle dans un emplacement fixe ou un service en dehors du pipeline TFX qui contient de nombreux résultats intermédiaires. Encore mieux avec une évaluation continue du modèle construit qui est critique dans les systèmes de production ML. Nous verrons comment l'évaluation continue et les déploiements fonctionnent dans TFX dans la prochaine étape.

Étape 5. (Facultatif) Évaluez le modèle avec Evaluator et publiez avec pusher.

Evaluator composante d' évaluer en permanence chaque modèle construit à partir de Trainer et Pusher copie le modèle à un emplacement prédéfini dans le système de fichiers ou même à Google Cloud Platform AI modèles .

Ajoute le composant Evaluator au pipeline.

Dans pipeline/pipeline.py fichier:

  1. Décommentez # components.append(model_resolver) pour ajouter le dernier résolveur de modèle à la canalisation. L'évaluateur peut être utilisé pour comparer un modèle avec l'ancien modèle de référence qui a réussi l'évaluateur lors de la dernière exécution du pipeline. LatestBlessedModelResolver trouve le dernier modèle qui a été adopté Evaluator.
  2. Définir une bonne tfma.MetricsSpec pour votre modèle. L'évaluation peut être différente pour chaque modèle de ML. Dans le modèle de pingouin, SparseCategoricalAccuracy a été utilisé parce que nous résolvons un problème de classification des catégories multiples. Vous devez également spécifier tfma.SliceSpec pour analyser votre modèle pour les tranches de spécifiques. Pour plus de détails, voir Evaluator Guide composante .
  3. Décommentez # components.append(evaluator) pour ajouter le composant à la canalisation.

Vous pouvez mettre à jour le pipeline et l'exécuter à nouveau.

# Update and run the pipeline.
!tfx pipeline update --engine=local --pipeline_path=local_runner.py \
 && tfx run create --engine=local --pipeline_name={PIPELINE_NAME}
CLI
Updating pipeline
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
Pipeline "my_pipeline" updated successfully.
CLI
Creating a run for pipeline: my_pipeline
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Running pipeline:
 pipeline_info {
  id: "my_pipeline"
}
nodes {
  pipeline_node {
    node_info {
      type {
        name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
      }
      id: "CsvExampleGen"
    }
    contexts {
      contexts {
        type {
          name: "pipeline"
        }
        name {
          field_value {
            string_value: "my_pipeline"
          }
        }
      }
      contexts {
        type {
          name: "pipeline_run"
        }
        name {
          field_value {
            string_value: "2021-09-30T02:14:53.779774"
          }
        }
      }
      contexts {
        type {
          name: "node"
        }
        name {
          field_value {
            string_value: "my_pipeline.CsvExampleGen"
          }
        }
      }
    }
    outputs {
      outputs {
        key: "examples"
        value {
          artifact_spec {
            type {
              name: "Examples"
              properties {
                key: "span"
                value: INT
              }
              properties {
                key: "split_names"
                value: STRING
              }
              properties {
                key: "version"
                value: INT
              }
            }
          }
        }
      }
    }
    parameters {
      parameters {
        key: "input_base"
        value {
          field_value {
            string_value: "/home/kbuilder/imported/my_pipeline/data"
          }
        }
      }
      parameters {
        key: "input_config"
        value {
          field_value {
            string_value: "{\n  \"splits\": [\n    {\n      \"name\": \"single_split\",\n      \"pattern\": \"*\"\n    }\n  ]\n}"
          }
        }
      }
      parameters {
        key: "output_config"
        value {
          field_value {
            string_value: "{\n  \"split_config\": {\n    \"splits\": [\n      {\n        \"hash_buckets\": 2,\n        \"name\": \"train\"\n      },\n      {\n        \"hash_buckets\": 1,\n        \"name\": \"eval\"\n      }\n    ]\n  }\n}"
          }
        }
      }
      parameters {
        key: "output_data_format"
        value {
          field_value {
            int_value: 6
          }
        }
      }
      parameters {
        key: "output_file_format"
        value {
          field_value {
            int_value: 5
          }
        }
      }
    }
    downstream_nodes: "StatisticsGen"
    execution_options {
      caching_options {
      }
    }
  }
}
nodes {
  pipeline_node {
    node_info {
      type {
        name: "tfx.components.statistics_gen.component.StatisticsGen"
      }
      id: "StatisticsGen"
    }
    contexts {
      contexts {
        type {
          name: "pipeline"
        }
        name {
          field_value {
            string_value: "my_pipeline"
          }
        }
      }
      contexts {
        type {
          name: "pipeline_run"
        }
        name {
          field_value {
            string_value: "2021-09-30T02:14:53.779774"
          }
        }
      }
      contexts {
        type {
          name: "node"
        }
        name {
          field_value {
            string_value: "my_pipeline.StatisticsGen"
          }
        }
      }
    }
    inputs {
      inputs {
        key: "examples"
        value {
          channels {
            producer_node_query {
              id: "CsvExampleGen"
            }
            context_queries {
              type {
                name: "pipeline"
              }
              name {
                field_value {
                  string_value: "my_pipeline"
                }
              }
            }
            context_queries {
              type {
                name: "pipeline_run"
              }
              name {
                field_value {
                  string_value: "2021-09-30T02:14:53.779774"
                }
              }
            }
            context_queries {
              type {
                name: "node"
              }
              name {
                field_value {
                  string_value: "my_pipeline.CsvExampleGen"
                }
              }
            }
            artifact_query {
              type {
                name: "Examples"
              }
            }
            output_key: "examples"
          }
        }
      }
    }
    outputs {
      outputs {
        key: "statistics"
        value {
          artifact_spec {
            type {
              name: "ExampleStatistics"
              properties {
                key: "span"
                value: INT
              }
              properties {
                key: "split_names"
                value: STRING
              }
            }
          }
        }
      }
    }
    parameters {
      parameters {
        key: "exclude_splits"
        value {
          field_value {
            string_value: "[]"
          }
        }
      }
    }
    upstream_nodes: "CsvExampleGen"
    downstream_nodes: "ExampleValidator"
    downstream_nodes: "SchemaGen"
    execution_options {
      caching_options {
      }
    }
  }
}
nodes {
  pipeline_node {
    node_info {
      type {
        name: "tfx.components.schema_gen.component.SchemaGen"
      }
      id: "SchemaGen"
    }
    contexts {
      contexts {
        type {
          name: "pipeline"
        }
        name {
          field_value {
            string_value: "my_pipeline"
          }
        }
      }
      contexts {
        type {
          name: "pipeline_run"
        }
        name {
          field_value {
            string_value: "2021-09-30T02:14:53.779774"
          }
        }
      }
      contexts {
        type {
          name: "node"
        }
        name {
          field_value {
            string_value: "my_pipeline.SchemaGen"
          }
        }
      }
    }
    inputs {
      inputs {
        key: "statistics"
        value {
          channels {
            producer_node_query {
              id: "StatisticsGen"
            }
            context_queries {
              type {
                name: "pipeline"
              }
              name {
                field_value {
                  string_value: "my_pipeline"
                }
              }
            }
            context_queries {
              type {
                name: "pipeline_run"
              }
              name {
                field_value {
                  string_value: "2021-09-30T02:14:53.779774"
                }
              }
            }
            context_queries {
              type {
                name: "node"
              }
              name {
                field_value {
                  string_value: "my_pipeline.StatisticsGen"
                }
              }
            }
            artifact_query {
              type {
                name: "ExampleStatistics"
              }
            }
            output_key: "statistics"
          }
        }
      }
    }
    outputs {
      outputs {
        key: "schema"
        value {
          artifact_spec {
            type {
              name: "Schema"
            }
          }
        }
      }
    }
    parameters {
      parameters {
        key: "exclude_splits"
        value {
          field_value {
            string_value: "[]"
          }
        }
      }
      parameters {
        key: "infer_feature_shape"
        value {
          field_value {
            int_value: 1
          }
        }
      }
    }
    upstream_nodes: "StatisticsGen"
    downstream_nodes: "ExampleValidator"
    execution_options {
      caching_options {
      }
    }
  }
}
nodes {
  pipeline_node {
    node_info {
      type {
        name: "tfx.components.example_validator.component.ExampleValidator"
      }
      id: "ExampleValidator"
    }
    contexts {
      contexts {
        type {
          name: "pipeline"
        }
        name {
          field_value {
            string_value: "my_pipeline"
          }
        }
      }
      contexts {
        type {
          name: "pipeline_run"
        }
        name {
          field_value {
            string_value: "2021-09-30T02:14:53.779774"
          }
        }
      }
      contexts {
        type {
          name: "node"
        }
        name {
          field_value {
            string_value: "my_pipeline.ExampleValidator"
          }
        }
      }
    }
    inputs {
      inputs {
        key: "schema"
        value {
          channels {
            producer_node_query {
              id: "SchemaGen"
            }
            context_queries {
              type {
                name: "pipeline"
              }
              name {
                field_value {
                  string_value: "my_pipeline"
                }
              }
            }
            context_queries {
              type {
                name: "pipeline_run"
              }
              name {
                field_value {
                  string_value: "2021-09-30T02:14:53.779774"
                }
              }
            }
            context_queries {
              type {
                name: "node"
              }
              name {
                field_value {
                  string_value: "my_pipeline.SchemaGen"
                }
              }
            }
            artifact_query {
              type {
                name: "Schema"
              }
            }
            output_key: "schema"
          }
        }
      }
      inputs {
        key: "statistics"
        value {
          channels {
            producer_node_query {
              id: "StatisticsGen"
            }
            context_queries {
              type {
                name: "pipeline"
              }
              name {
                field_value {
                  string_value: "my_pipeline"
                }
              }
            }
            context_queries {
              type {
                name: "pipeline_run"
              }
              name {
                field_value {
                  string_value: "2021-09-30T02:14:53.779774"
                }
              }
            }
            context_queries {
              type {
                name: "node"
              }
              name {
                field_value {
                  string_value: "my_pipeline.StatisticsGen"
                }
              }
            }
            artifact_query {
              type {
                name: "ExampleStatistics"
              }
            }
            output_key: "statistics"
          }
        }
      }
    }
    outputs {
      outputs {
        key: "anomalies"
        value {
          artifact_spec {
            type {
              name: "ExampleAnomalies"
              properties {
                key: "span"
                value: INT
              }
              properties {
                key: "split_names"
                value: STRING
              }
            }
          }
        }
      }
    }
    parameters {
      parameters {
        key: "exclude_splits"
        value {
          field_value {
            string_value: "[]"
          }
        }
      }
    }
    upstream_nodes: "SchemaGen"
    upstream_nodes: "StatisticsGen"
    execution_options {
      caching_options {
      }
    }
  }
}
runtime_spec {
  pipeline_root {
    field_value {
      string_value: "./tfx_pipeline_output/my_pipeline"
    }
  }
  pipeline_run_id {
    field_value {
      string_value: "2021-09-30T02:14:53.779774"
    }
  }
}
execution_mode: SYNC
deployment_config {
  type_url: "type.googleapis.com/tfx.orchestration.IntermediateDeploymentConfig"
  value: "\n\220\001\n\rStatisticsGen\022\177\nHtype.googleapis.com/tfx.orchestration.executable_spec.BeamExecutableSpec\0223\n1\n/tfx.components.statistics_gen.executor.Executor\n\216\001\n\tSchemaGen\022\200\001\nOtype.googleapis.com/tfx.orchestration.executable_spec.PythonClassExecutableSpec\022-\n+tfx.components.schema_gen.executor.Executor\n\234\001\n\020ExampleValidator\022\207\001\nOtype.googleapis.com/tfx.orchestration.executable_spec.PythonClassExecutableSpec\0224\n2tfx.components.example_validator.executor.Executor\n\236\001\n\rCsvExampleGen\022\214\001\nHtype.googleapis.com/tfx.orchestration.executable_spec.BeamExecutableSpec\022@\n>\n<tfx.components.example_gen.csv_example_gen.executor.Executor\022\230\001\n\rCsvExampleGen\022\206\001\nOtype.googleapis.com/tfx.orchestration.executable_spec.PythonClassExecutableSpec\0223\n1tfx.components.example_gen.driver.FileBasedDriver*`\n0type.googleapis.com/ml_metadata.ConnectionConfig\022,\032*\n&./tfx_metadata/my_pipeline/metadata.db\020\003"
}

INFO:absl:Using deployment config:
 executor_specs {
  key: "CsvExampleGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor"
      }
    }
  }
}
executor_specs {
  key: "ExampleValidator"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.example_validator.executor.Executor"
    }
  }
}
executor_specs {
  key: "SchemaGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.schema_gen.executor.Executor"
    }
  }
}
executor_specs {
  key: "StatisticsGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.statistics_gen.executor.Executor"
      }
    }
  }
}
custom_driver_specs {
  key: "CsvExampleGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.example_gen.driver.FileBasedDriver"
    }
  }
}
metadata_connection_config {
  sqlite {
    filename_uri: "./tfx_metadata/my_pipeline/metadata.db"
    connection_mode: READWRITE_OPENCREATE
  }
}

INFO:absl:Using connection config:
 sqlite {
  filename_uri: "./tfx_metadata/my_pipeline/metadata.db"
  connection_mode: READWRITE_OPENCREATE
}

INFO:absl:Component CsvExampleGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
  }
  id: "CsvExampleGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2021-09-30T02:14:53.779774"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.CsvExampleGen"
      }
    }
  }
}
outputs {
  outputs {
    key: "examples"
    value {
      artifact_spec {
        type {
          name: "Examples"
          properties {
            key: "span"
            value: INT
          }
          properties {
            key: "split_names"
            value: STRING
          }
          properties {
            key: "version"
            value: INT
          }
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "input_base"
    value {
      field_value {
        string_value: "/home/kbuilder/imported/my_pipeline/data"
      }
    }
  }
  parameters {
    key: "input_config"
    value {
      field_value {
        string_value: "{\n  \"splits\": [\n    {\n      \"name\": \"single_split\",\n      \"pattern\": \"*\"\n    }\n  ]\n}"
      }
    }
  }
  parameters {
    key: "output_config"
    value {
      field_value {
        string_value: "{\n  \"split_config\": {\n    \"splits\": [\n      {\n        \"hash_buckets\": 2,\n        \"name\": \"train\"\n      },\n      {\n        \"hash_buckets\": 1,\n        \"name\": \"eval\"\n      }\n    ]\n  }\n}"
      }
    }
  }
  parameters {
    key: "output_data_format"
    value {
      field_value {
        int_value: 6
      }
    }
  }
  parameters {
    key: "output_file_format"
    value {
      field_value {
        int_value: 5
      }
    }
  }
}
downstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
  }
}

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:select span and version = (0, None)
INFO:absl:latest span and version = (0, None)
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 7
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=7, input_dict={}, output_dict=defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/7"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1632968027,sum_checksum:1632968027"
  }
}
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2021-09-30T02:14:53.779774:CsvExampleGen:examples:0"
  }
}
custom_properties {
  key: "span"
  value {
    int_value: 0
  }
}
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
properties {
  key: "version"
  value: INT
}
)]}), exec_properties={'input_base': '/home/kbuilder/imported/my_pipeline/data', 'output_data_format': 6, 'output_file_format': 5, 'output_config': '{\n  "split_config": {\n    "splits": [\n      {\n        "hash_buckets": 2,\n        "name": "train"\n      },\n      {\n        "hash_buckets": 1,\n        "name": "eval"\n      }\n    ]\n  }\n}', 'input_config': '{\n  "splits": [\n    {\n      "name": "single_split",\n      "pattern": "*"\n    }\n  ]\n}', 'span': 0, 'version': None, 'input_fingerprint': 'split:single_split,num_files:1,total_bytes:25648,xor_checksum:1632968027,sum_checksum:1632968027'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/7/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/stateful_working_dir/2021-09-30T02:14:53.779774', tmp_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/7/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
  }
  id: "CsvExampleGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2021-09-30T02:14:53.779774"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.CsvExampleGen"
      }
    }
  }
}
outputs {
  outputs {
    key: "examples"
    value {
      artifact_spec {
        type {
          name: "Examples"
          properties {
            key: "span"
            value: INT
          }
          properties {
            key: "split_names"
            value: STRING
          }
          properties {
            key: "version"
            value: INT
          }
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "input_base"
    value {
      field_value {
        string_value: "/home/kbuilder/imported/my_pipeline/data"
      }
    }
  }
  parameters {
    key: "input_config"
    value {
      field_value {
        string_value: "{\n  \"splits\": [\n    {\n      \"name\": \"single_split\",\n      \"pattern\": \"*\"\n    }\n  ]\n}"
      }
    }
  }
  parameters {
    key: "output_config"
    value {
      field_value {
        string_value: "{\n  \"split_config\": {\n    \"splits\": [\n      {\n        \"hash_buckets\": 2,\n        \"name\": \"train\"\n      },\n      {\n        \"hash_buckets\": 1,\n        \"name\": \"eval\"\n      }\n    ]\n  }\n}"
      }
    }
  }
  parameters {
    key: "output_data_format"
    value {
      field_value {
        int_value: 6
      }
    }
  }
  parameters {
    key: "output_file_format"
    value {
      field_value {
        int_value: 5
      }
    }
  }
}
downstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
  }
}
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2021-09-30T02:14:53.779774')
INFO:absl:Generating examples.
INFO:absl:Processing input csv data /home/kbuilder/imported/my_pipeline/data/* to TFExample.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be.
INFO:absl:Examples generated.
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 7 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/7"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1632968027,sum_checksum:1632968027"
  }
}
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2021-09-30T02:14:53.779774:CsvExampleGen:examples:0"
  }
}
custom_properties {
  key: "span"
  value {
    int_value: 0
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.2.0"
  }
}
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
properties {
  key: "version"
  value: INT
}
)]}) for execution 7
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component CsvExampleGen is finished.
INFO:absl:Component StatisticsGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.statistics_gen.component.StatisticsGen"
  }
  id: "StatisticsGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2021-09-30T02:14:53.779774"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.StatisticsGen"
      }
    }
  }
}
inputs {
  inputs {
    key: "examples"
    value {
      channels {
        producer_node_query {
          id: "CsvExampleGen"
        }
        context_queries {
          type {
            name: "pipeline"
          }
          name {
            field_value {
              string_value: "my_pipeline"
            }
          }
        }
        context_queries {
          type {
            name: "pipeline_run"
          }
          name {
            field_value {
              string_value: "2021-09-30T02:14:53.779774"
            }
          }
        }
        context_queries {
          type {
            name: "node"
          }
          name {
            field_value {
              string_value: "my_pipeline.CsvExampleGen"
            }
          }
        }
        artifact_query {
          type {
            name: "Examples"
          }
        }
        output_key: "examples"
      }
    }
  }
}
outputs {
  outputs {
    key: "statistics"
    value {
      artifact_spec {
        type {
          name: "ExampleStatistics"
          properties {
            key: "span"
            value: INT
          }
          properties {
            key: "split_names"
            value: STRING
          }
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
      }
    }
  }
}
upstream_nodes: "CsvExampleGen"
downstream_nodes: "ExampleValidator"
downstream_nodes: "SchemaGen"
execution_options {
  caching_options {
  }
}

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 8
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=8, input_dict={'examples': [Artifact(artifact: id: 4
type_id: 15
uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/7"
properties {
  key: "split_names"
  value {
    string_value: "[\"train\", \"eval\"]"
  }
}
custom_properties {
  key: "file_format"
  value {
    string_value: "tfrecords_gzip"
  }
}
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1632968027,sum_checksum:1632968027"
  }
}
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2021-09-30T02:14:53.779774:CsvExampleGen:examples:0"
  }
}
custom_properties {
  key: "payload_format"
  value {
    string_value: "FORMAT_TF_EXAMPLE"
  }
}
custom_properties {
  key: "span"
  value {
    int_value: 0
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.2.0"
  }
}
state: LIVE
create_time_since_epoch: 1632968094773
last_update_time_since_epoch: 1632968094773
, artifact_type: id: 15
name: "Examples"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
properties {
  key: "version"
  value: INT
}
)]}, output_dict=defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/8"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2021-09-30T02:14:53.779774:StatisticsGen:statistics:0"
  }
}
, artifact_type: name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
)]}), exec_properties={'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/8/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/stateful_working_dir/2021-09-30T02:14:53.779774', tmp_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/8/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.statistics_gen.component.StatisticsGen"
  }
  id: "StatisticsGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2021-09-30T02:14:53.779774"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.StatisticsGen"
      }
    }
  }
}
inputs {
  inputs {
    key: "examples"
    value {
      channels {
        producer_node_query {
          id: "CsvExampleGen"
        }
        context_queries {
          type {
            name: "pipeline"
          }
          name {
            field_value {
              string_value: "my_pipeline"
            }
          }
        }
        context_queries {
          type {
            name: "pipeline_run"
          }
          name {
            field_value {
              string_value: "2021-09-30T02:14:53.779774"
            }
          }
        }
        context_queries {
          type {
            name: "node"
          }
          name {
            field_value {
              string_value: "my_pipeline.CsvExampleGen"
            }
          }
        }
        artifact_query {
          type {
            name: "Examples"
          }
        }
        output_key: "examples"
      }
    }
  }
}
outputs {
  outputs {
    key: "statistics"
    value {
      artifact_spec {
        type {
          name: "ExampleStatistics"
          properties {
            key: "span"
            value: INT
          }
          properties {
            key: "split_names"
            value: STRING
          }
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
      }
    }
  }
}
upstream_nodes: "CsvExampleGen"
downstream_nodes: "ExampleValidator"
downstream_nodes: "SchemaGen"
execution_options {
  caching_options {
  }
}
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2021-09-30T02:14:53.779774')
INFO:absl:Generating statistics for split train.
INFO:absl:Statistics for split train written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/8/Split-train.
INFO:absl:Generating statistics for split eval.
INFO:absl:Statistics for split eval written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/8/Split-eval.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
INFO:absl:MetadataStore with DB connection initialized
ERROR:absl:Execution 8 failed.
INFO:absl:Cleaning up stateless execution info.
Traceback (most recent call last):
  File "local_runner.py", line 78, in <module>
    run()
  File "local_runner.py", line 73, in run
    .sqlite_metadata_connection_config(METADATA_PATH)))
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tfx/orchestration/local/local_dag_runner.py", line 91, in run
    component_launcher.launch()
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tfx/orchestration/portable/launcher.py", line 482, in launch
    executor_output = self._run_executor(execution_info)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tfx/orchestration/portable/launcher.py", line 358, in _run_executor
    executor_output = self._executor_operator.run_executor(execution_info)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tfx/orchestration/portable/beam_executor_operator.py", line 89, in run_executor
    return python_executor_operator.run_with_executor(execution_info, executor)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tfx/orchestration/portable/python_executor_operator.py", line 55, in run_with_executor
    execution_info.exec_properties)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tfx/components/statistics_gen/executor.py", line 142, in Do
    output_uri)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/apache_beam/pipeline.py", line 585, in __exit__
    self.result = self.run()
  File "/home/kbuilder/.local/lib/python3.7/site-packages/apache_beam/pipeline.py", line 564, in run
    return self.runner.run_pipeline(self, self._options)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py", line 131, in run_pipeline
    return runner.run_pipeline(pipeline, options)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 196, in run_pipeline
    pipeline.to_runner_api(default_environment=self._default_environment))
  File "/home/kbuilder/.local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 206, in run_via_runner_api
    return self.run_stages(stage_context, stages)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 385, in run_stages
    runner_execution_context, bundle_context_manager)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 653, in _run_stage
    bundle_manager))
  File "/home/kbuilder/.local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 770, in _run_bundle
    data_input, data_output, input_timers, expected_timer_output)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1080, in process_bundle
    result_future = self._worker_handler.control_conn.push(process_bundle_req)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", line 378, in push
    response = self.worker.do_instruction(request)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 602, in do_instruction
    getattr(request, request_type), request.instruction_id)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 639, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "/home/kbuilder/.local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1002, in process_bundle
    op.finish()
  File "apache_beam/runners/worker/operations.py", line 1071, in apache_beam.runners.worker.operations.PGBKCVOperation.finish
  File "apache_beam/runners/worker/operations.py", line 1074, in apache_beam.runners.worker.operations.PGBKCVOperation.finish
  File "apache_beam/runners/worker/operations.py", line 1089, in apache_beam.runners.worker.operations.PGBKCVOperation.output_key
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tensorflow_data_validation/statistics/stats_impl.py", line 757, in compact
    self._maybe_do_batch(accumulator, force=True)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tensorflow_data_validation/statistics/stats_impl.py", line 690, in _maybe_do_batch
    accumulator.partial_accumulators)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tensorflow_data_validation/statistics/stats_impl.py", line 649, in _for_each_generator
    self._generators, zip(*args))]
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tensorflow_data_validation/statistics/stats_impl.py", line 648, in <listcomp>
    return [func(gen, *args_for_func) for gen, args_for_func in zip(
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tensorflow_data_validation/statistics/stats_impl.py", line 689, in <lambda>
    lambda gen, gen_acc: gen.add_input(gen_acc, record_batch),
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tensorflow_data_validation/statistics/generators/basic_stats_generator.py", line 1026, in add_input
    weights)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tensorflow_data_validation/statistics/generators/basic_stats_generator.py", line 232, in update
    weights)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tensorflow_data_validation/statistics/generators/basic_stats_generator.py", line 128, in update
    num_values_grouped = pa.array(num_values_not_none).value_counts()
  File "pyarrow/array.pxi", line 292, in pyarrow.lib.array
  File "pyarrow/array.pxi", line 79, in pyarrow.lib._ndarray_to_array
  File "pyarrow/array.pxi", line 67, in pyarrow.lib._ndarray_to_type
  File "pyarrow/error.pxi", line 107, in pyarrow.lib.check_status
pyarrow.lib.ArrowTypeError: Did not pass numpy.dtype object
Error while running "/usr/bin/python3.7 local_runner.py"

Examiner la sortie de l'évaluateur

Cette étape nécessite l'extension de bloc-notes Jupyter TensorFlow Model Analysis (TFMA). Notez que la version de l'extension de bloc-notes TFMA doit être identique à la version du package python TFMA.

La commande suivante installera l'extension de bloc-notes TFMA à partir du registre NPM. Cela peut prendre plusieurs minutes.

# Install TFMA notebook extension.
jupyter labextension install tensorflow_model_analysis@{tfma.__version__}
usage: jupyter [-h] [--version] [--config-dir] [--data-dir] [--runtime-dir]
               [--paths] [--json] [--debug]
               [subcommand]

Jupyter: Interactive Computing

positional arguments:
  subcommand     the subcommand to launch

optional arguments:
  -h, --help     show this help message and exit
  --version      show the versions of core jupyter packages and exit
  --config-dir   show Jupyter config dir
  --data-dir     show Jupyter data dir
  --runtime-dir  show Jupyter runtime dir
  --paths        show all Jupyter paths. Add --json for machine-readable
                 format.
  --json         output paths as machine-readable json
  --debug        output debug information about paths

Available subcommands: bundlerextension console dejavu kernel kernelspec
migrate nbconvert nbextension notebook qtconsole run serverextension
troubleshoot trust

Jupyter command `jupyter-labextension` not found.

Si l' installation est terminée, s'il vous plaît recharger votre navigateur pour rendre l'effet de prendre de l' extension.

with metadata.Metadata(metadata_connection_config) as metadata_handler:
  # Search all aritfacts from the previous pipeline run.
  artifacts = get_latest_artifacts(metadata_handler.store, PIPELINE_NAME)
  model_evaluation_artifacts = find_latest_artifacts_by_type(
      metadata_handler.store, artifacts,
      standard_artifacts.ModelEvaluation.TYPE_NAME)
if model_evaluation_artifacts:
  tfma_result = tfma.load_eval_result(model_evaluation_artifacts[0].uri)
  tfma.view.render_slicing_metrics(tfma_result)

Ajoute le composant Pusher au pipeline.

Si le modèle semble prometteur, nous devons le publier. Élément pousseur peut publier le modèle à un emplacement dans le système de fichiers ou GCP modèles AI plateforme à l' aide d' un exécuteur testamentaire personnalisé .

Evaluator composante d' évaluer en permanence chaque modèle construit à partir de Trainer et Pusher copie le modèle à un emplacement prédéfini dans le système de fichiers ou même à Google Cloud Platform AI modèles .

  1. Dans local_runner.py , ensemble SERVING_MODEL_DIR à un répertoire pour publier.
  2. Dans pipeline/pipeline.py fichier, décommentez # components.append(pusher) pour ajouter Pusher au pipeline.

Vous pouvez mettre à jour le pipeline et l'exécuter à nouveau.

# Update and run the pipeline.
!tfx pipeline update --engine=local --pipeline_path=local_runner.py \
 && tfx run create --engine=local --pipeline_name={PIPELINE_NAME}
CLI
Updating pipeline
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
Pipeline "my_pipeline" updated successfully.
CLI
Creating a run for pipeline: my_pipeline
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Running pipeline:
 pipeline_info {
  id: "my_pipeline"
}
nodes {
  pipeline_node {
    node_info {
      type {
        name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
      }
      id: "CsvExampleGen"
    }
    contexts {
      contexts {
        type {
          name: "pipeline"
        }
        name {
          field_value {
            string_value: "my_pipeline"
          }
        }
      }
      contexts {
        type {
          name: "pipeline_run"
        }
        name {
          field_value {
            string_value: "2021-09-30T02:15:09.210435"
          }
        }
      }
      contexts {
        type {
          name: "node"
        }
        name {
          field_value {
            string_value: "my_pipeline.CsvExampleGen"
          }
        }
      }
    }
    outputs {
      outputs {
        key: "examples"
        value {
          artifact_spec {
            type {
              name: "Examples"
              properties {
                key: "span"
                value: INT
              }
              properties {
                key: "split_names"
                value: STRING
              }
              properties {
                key: "version"
                value: INT
              }
            }
          }
        }
      }
    }
    parameters {
      parameters {
        key: "input_base"
        value {
          field_value {
            string_value: "/home/kbuilder/imported/my_pipeline/data"
          }
        }
      }
      parameters {
        key: "input_config"
        value {
          field_value {
            string_value: "{\n  \"splits\": [\n    {\n      \"name\": \"single_split\",\n      \"pattern\": \"*\"\n    }\n  ]\n}"
          }
        }
      }
      parameters {
        key: "output_config"
        value {
          field_value {
            string_value: "{\n  \"split_config\": {\n    \"splits\": [\n      {\n        \"hash_buckets\": 2,\n        \"name\": \"train\"\n      },\n      {\n        \"hash_buckets\": 1,\n        \"name\": \"eval\"\n      }\n    ]\n  }\n}"
          }
        }
      }
      parameters {
        key: "output_data_format"
        value {
          field_value {
            int_value: 6
          }
        }
      }
      parameters {
        key: "output_file_format"
        value {
          field_value {
            int_value: 5
          }
        }
      }
    }
    downstream_nodes: "StatisticsGen"
    execution_options {
      caching_options {
      }
    }
  }
}
nodes {
  pipeline_node {
    node_info {
      type {
        name: "tfx.components.statistics_gen.component.StatisticsGen"
      }
      id: "StatisticsGen"
    }
    contexts {
      contexts {
        type {
          name: "pipeline"
        }
        name {
          field_value {
            string_value: "my_pipeline"
          }
        }
      }
      contexts {
        type {
          name: "pipeline_run"
        }
        name {
          field_value {
            string_value: "2021-09-30T02:15:09.210435"
          }
        }
      }
      contexts {
        type {
          name: "node"
        }
        name {
          field_value {
            string_value: "my_pipeline.StatisticsGen"
          }
        }
      }
    }
    inputs {
      inputs {
        key: "examples"
        value {
          channels {
            producer_node_query {
              id: "CsvExampleGen"
            }
            context_queries {
              type {
                name: "pipeline"
              }
              name {
                field_value {
                  string_value: "my_pipeline"
                }
              }
            }
            context_queries {
              type {
                name: "pipeline_run"
              }
              name {
                field_value {
                  string_value: "2021-09-30T02:15:09.210435"
                }
              }
            }
            context_queries {
              type {
                name: "node"
              }
              name {
                field_value {
                  string_value: "my_pipeline.CsvExampleGen"
                }
              }
            }
            artifact_query {
              type {
                name: "Examples"
              }
            }
            output_key: "examples"
          }
        }
      }
    }
    outputs {
      outputs {
        key: "statistics"
        value {
          artifact_spec {
            type {
              name: "ExampleStatistics"
              properties {
                key: "span"
                value: INT
              }
              properties {
                key: "split_names"
                value: STRING
              }
            }
          }
        }
      }
    }
    parameters {
      parameters {
        key: "exclude_splits"
        value {
          field_value {
            string_value: "[]"
          }
        }
      }
    }
    upstream_nodes: "CsvExampleGen"
    downstream_nodes: "ExampleValidator"
    downstream_nodes: "SchemaGen"
    execution_options {
      caching_options {
      }
    }
  }
}
nodes {
  pipeline_node {
    node_info {
      type {
        name: "tfx.components.schema_gen.component.SchemaGen"
      }
      id: "SchemaGen"
    }
    contexts {
      contexts {
        type {
          name: "pipeline"
        }
        name {
          field_value {
            string_value: "my_pipeline"
          }
        }
      }
      contexts {
        type {
          name: "pipeline_run"
        }
        name {
          field_value {
            string_value: "2021-09-30T02:15:09.210435"
          }
        }
      }
      contexts {
        type {
          name: "node"
        }
        name {
          field_value {
            string_value: "my_pipeline.SchemaGen"
          }
        }
      }
    }
    inputs {
      inputs {
        key: "statistics"
        value {
          channels {
            producer_node_query {
              id: "StatisticsGen"
            }
            context_queries {
              type {
                name: "pipeline"
              }
              name {
                field_value {
                  string_value: "my_pipeline"
                }
              }
            }
            context_queries {
              type {
                name: "pipeline_run"
              }
              name {
                field_value {
                  string_value: "2021-09-30T02:15:09.210435"
                }
              }
            }
            context_queries {
              type {
                name: "node"
              }
              name {
                field_value {
                  string_value: "my_pipeline.StatisticsGen"
                }
              }
            }
            artifact_query {
              type {
                name: "ExampleStatistics"
              }
            }
            output_key: "statistics"
          }
        }
      }
    }
    outputs {
      outputs {
        key: "schema"
        value {
          artifact_spec {
            type {
              name: "Schema"
            }
          }
        }
      }
    }
    parameters {
      parameters {
        key: "exclude_splits"
        value {
          field_value {
            string_value: "[]"
          }
        }
      }
      parameters {
        key: "infer_feature_shape"
        value {
          field_value {
            int_value: 1
          }
        }
      }
    }
    upstream_nodes: "StatisticsGen"
    downstream_nodes: "ExampleValidator"
    execution_options {
      caching_options {
      }
    }
  }
}
nodes {
  pipeline_node {
    node_info {
      type {
        name: "tfx.components.example_validator.component.ExampleValidator"
      }
      id: "ExampleValidator"
    }
    contexts {
      contexts {
        type {
          name: "pipeline"
        }
        name {
          field_value {
            string_value: "my_pipeline"
          }
        }
      }
      contexts {
        type {
          name: "pipeline_run"
        }
        name {
          field_value {
            string_value: "2021-09-30T02:15:09.210435"
          }
        }
      }
      contexts {
        type {
          name: "node"
        }
        name {
          field_value {
            string_value: "my_pipeline.ExampleValidator"
          }
        }
      }
    }
    inputs {
      inputs {
        key: "schema"
        value {
          channels {
            producer_node_query {
              id: "SchemaGen"
            }
            context_queries {
              type {
                name: "pipeline"
              }
              name {
                field_value {
                  string_value: "my_pipeline"
                }
              }
            }
            context_queries {
              type {
                name: "pipeline_run"
              }
              name {
                field_value {
                  string_value: "2021-09-30T02:15:09.210435"
                }
              }
            }
            context_queries {
              type {
                name: "node"
              }
              name {
                field_value {
                  string_value: "my_pipeline.SchemaGen"
                }
              }
            }
            artifact_query {
              type {
                name: "Schema"
              }
            }
            output_key: "schema"
          }
        }
      }
      inputs {
        key: "statistics"
        value {
          channels {
            producer_node_query {
              id: "StatisticsGen"
            }
            context_queries {
              type {
                name: "pipeline"
              }
              name {
                field_value {
                  string_value: "my_pipeline"
                }
              }
            }
            context_queries {
              type {
                name: "pipeline_run"
              }
              name {
                field_value {
                  string_value: "2021-09-30T02:15:09.210435"
                }
              }
            }
            context_queries {
              type {
                name: "node"
              }
              name {
                field_value {
                  string_value: "my_pipeline.StatisticsGen"
                }
              }
            }
            artifact_query {
              type {
                name: "ExampleStatistics"
              }
            }
            output_key: "statistics"
          }
        }
      }
    }
    outputs {
      outputs {
        key: "anomalies"
        value {
          artifact_spec {
            type {
              name: "ExampleAnomalies"
              properties {
                key: "span"
                value: INT
              }
              properties {
                key: "split_names"
                value: STRING
              }
            }
          }
        }
      }
    }
    parameters {
      parameters {
        key: "exclude_splits"
        value {
          field_value {
            string_value: "[]"
          }
        }
      }
    }
    upstream_nodes: "SchemaGen"
    upstream_nodes: "StatisticsGen"
    execution_options {
      caching_options {
      }
    }
  }
}
runtime_spec {
  pipeline_root {
    field_value {
      string_value: "./tfx_pipeline_output/my_pipeline"
    }
  }
  pipeline_run_id {
    field_value {
      string_value: "2021-09-30T02:15:09.210435"
    }
  }
}
execution_mode: SYNC
deployment_config {
  type_url: "type.googleapis.com/tfx.orchestration.IntermediateDeploymentConfig"
  value: "\n\234\001\n\020ExampleValidator\022\207\001\nOtype.googleapis.com/tfx.orchestration.executable_spec.PythonClassExecutableSpec\0224\n2tfx.components.example_validator.executor.Executor\n\236\001\n\rCsvExampleGen\022\214\001\nHtype.googleapis.com/tfx.orchestration.executable_spec.BeamExecutableSpec\022@\n>\n<tfx.components.example_gen.csv_example_gen.executor.Executor\n\216\001\n\tSchemaGen\022\200\001\nOtype.googleapis.com/tfx.orchestration.executable_spec.PythonClassExecutableSpec\022-\n+tfx.components.schema_gen.executor.Executor\n\220\001\n\rStatisticsGen\022\177\nHtype.googleapis.com/tfx.orchestration.executable_spec.BeamExecutableSpec\0223\n1\n/tfx.components.statistics_gen.executor.Executor\022\230\001\n\rCsvExampleGen\022\206\001\nOtype.googleapis.com/tfx.orchestration.executable_spec.PythonClassExecutableSpec\0223\n1tfx.components.example_gen.driver.FileBasedDriver*`\n0type.googleapis.com/ml_metadata.ConnectionConfig\022,\032*\n&./tfx_metadata/my_pipeline/metadata.db\020\003"
}

INFO:absl:Using deployment config:
 executor_specs {
  key: "CsvExampleGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor"
      }
    }
  }
}
executor_specs {
  key: "ExampleValidator"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.example_validator.executor.Executor"
    }
  }
}
executor_specs {
  key: "SchemaGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.schema_gen.executor.Executor"
    }
  }
}
executor_specs {
  key: "StatisticsGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.statistics_gen.executor.Executor"
      }
    }
  }
}
custom_driver_specs {
  key: "CsvExampleGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.example_gen.driver.FileBasedDriver"
    }
  }
}
metadata_connection_config {
  sqlite {
    filename_uri: "./tfx_metadata/my_pipeline/metadata.db"
    connection_mode: READWRITE_OPENCREATE
  }
}

INFO:absl:Using connection config:
 sqlite {
  filename_uri: "./tfx_metadata/my_pipeline/metadata.db"
  connection_mode: READWRITE_OPENCREATE
}

INFO:absl:Component CsvExampleGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
  }
  id: "CsvExampleGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2021-09-30T02:15:09.210435"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.CsvExampleGen"
      }
    }
  }
}
outputs {
  outputs {
    key: "examples"
    value {
      artifact_spec {
        type {
          name: "Examples"
          properties {
            key: "span"
            value: INT
          }
          properties {
            key: "split_names"
            value: STRING
          }
          properties {
            key: "version"
            value: INT
          }
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "input_base"
    value {
      field_value {
        string_value: "/home/kbuilder/imported/my_pipeline/data"
      }
    }
  }
  parameters {
    key: "input_config"
    value {
      field_value {
        string_value: "{\n  \"splits\": [\n    {\n      \"name\": \"single_split\",\n      \"pattern\": \"*\"\n    }\n  ]\n}"
      }
    }
  }
  parameters {
    key: "output_config"
    value {
      field_value {
        string_value: "{\n  \"split_config\": {\n    \"splits\": [\n      {\n        \"hash_buckets\": 2,\n        \"name\": \"train\"\n      },\n      {\n        \"hash_buckets\": 1,\n        \"name\": \"eval\"\n      }\n    ]\n  }\n}"
      }
    }
  }
  parameters {
    key: "output_data_format"
    value {
      field_value {
        int_value: 6
      }
    }
  }
  parameters {
    key: "output_file_format"
    value {
      field_value {
        int_value: 5
      }
    }
  }
}
downstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
  }
}

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:select span and version = (0, None)
INFO:absl:latest span and version = (0, None)
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 9
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=9, input_dict={}, output_dict=defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/9"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1632968027,sum_checksum:1632968027"
  }
}
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2021-09-30T02:15:09.210435:CsvExampleGen:examples:0"
  }
}
custom_properties {
  key: "span"
  value {
    int_value: 0
  }
}
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
properties {
  key: "version"
  value: INT
}
)]}), exec_properties={'output_data_format': 6, 'input_config': '{\n  "splits": [\n    {\n      "name": "single_split",\n      "pattern": "*"\n    }\n  ]\n}', 'input_base': '/home/kbuilder/imported/my_pipeline/data', 'output_config': '{\n  "split_config": {\n    "splits": [\n      {\n        "hash_buckets": 2,\n        "name": "train"\n      },\n      {\n        "hash_buckets": 1,\n        "name": "eval"\n      }\n    ]\n  }\n}', 'output_file_format': 5, 'span': 0, 'version': None, 'input_fingerprint': 'split:single_split,num_files:1,total_bytes:25648,xor_checksum:1632968027,sum_checksum:1632968027'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/9/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/stateful_working_dir/2021-09-30T02:15:09.210435', tmp_dir='./tfx_pipeline_output/my_pipeline/CsvExampleGen/.system/executor_execution/9/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen"
  }
  id: "CsvExampleGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2021-09-30T02:15:09.210435"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.CsvExampleGen"
      }
    }
  }
}
outputs {
  outputs {
    key: "examples"
    value {
      artifact_spec {
        type {
          name: "Examples"
          properties {
            key: "span"
            value: INT
          }
          properties {
            key: "split_names"
            value: STRING
          }
          properties {
            key: "version"
            value: INT
          }
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "input_base"
    value {
      field_value {
        string_value: "/home/kbuilder/imported/my_pipeline/data"
      }
    }
  }
  parameters {
    key: "input_config"
    value {
      field_value {
        string_value: "{\n  \"splits\": [\n    {\n      \"name\": \"single_split\",\n      \"pattern\": \"*\"\n    }\n  ]\n}"
      }
    }
  }
  parameters {
    key: "output_config"
    value {
      field_value {
        string_value: "{\n  \"split_config\": {\n    \"splits\": [\n      {\n        \"hash_buckets\": 2,\n        \"name\": \"train\"\n      },\n      {\n        \"hash_buckets\": 1,\n        \"name\": \"eval\"\n      }\n    ]\n  }\n}"
      }
    }
  }
  parameters {
    key: "output_data_format"
    value {
      field_value {
        int_value: 6
      }
    }
  }
  parameters {
    key: "output_file_format"
    value {
      field_value {
        int_value: 5
      }
    }
  }
}
downstream_nodes: "StatisticsGen"
execution_options {
  caching_options {
  }
}
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2021-09-30T02:15:09.210435')
INFO:absl:Generating examples.
INFO:absl:Processing input csv data /home/kbuilder/imported/my_pipeline/data/* to TFExample.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be.
INFO:absl:Examples generated.
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 9 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/9"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1632968027,sum_checksum:1632968027"
  }
}
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2021-09-30T02:15:09.210435:CsvExampleGen:examples:0"
  }
}
custom_properties {
  key: "span"
  value {
    int_value: 0
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.2.0"
  }
}
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
properties {
  key: "version"
  value: INT
}
)]}) for execution 9
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component CsvExampleGen is finished.
INFO:absl:Component StatisticsGen is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.statistics_gen.component.StatisticsGen"
  }
  id: "StatisticsGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2021-09-30T02:15:09.210435"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.StatisticsGen"
      }
    }
  }
}
inputs {
  inputs {
    key: "examples"
    value {
      channels {
        producer_node_query {
          id: "CsvExampleGen"
        }
        context_queries {
          type {
            name: "pipeline"
          }
          name {
            field_value {
              string_value: "my_pipeline"
            }
          }
        }
        context_queries {
          type {
            name: "pipeline_run"
          }
          name {
            field_value {
              string_value: "2021-09-30T02:15:09.210435"
            }
          }
        }
        context_queries {
          type {
            name: "node"
          }
          name {
            field_value {
              string_value: "my_pipeline.CsvExampleGen"
            }
          }
        }
        artifact_query {
          type {
            name: "Examples"
          }
        }
        output_key: "examples"
      }
    }
  }
}
outputs {
  outputs {
    key: "statistics"
    value {
      artifact_spec {
        type {
          name: "ExampleStatistics"
          properties {
            key: "span"
            value: INT
          }
          properties {
            key: "split_names"
            value: STRING
          }
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
      }
    }
  }
}
upstream_nodes: "CsvExampleGen"
downstream_nodes: "ExampleValidator"
downstream_nodes: "SchemaGen"
execution_options {
  caching_options {
  }
}

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 10
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=10, input_dict={'examples': [Artifact(artifact: id: 5
type_id: 15
uri: "./tfx_pipeline_output/my_pipeline/CsvExampleGen/examples/9"
properties {
  key: "split_names"
  value {
    string_value: "[\"train\", \"eval\"]"
  }
}
custom_properties {
  key: "file_format"
  value {
    string_value: "tfrecords_gzip"
  }
}
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1632968027,sum_checksum:1632968027"
  }
}
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2021-09-30T02:15:09.210435:CsvExampleGen:examples:0"
  }
}
custom_properties {
  key: "payload_format"
  value {
    string_value: "FORMAT_TF_EXAMPLE"
  }
}
custom_properties {
  key: "span"
  value {
    int_value: 0
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.2.0"
  }
}
state: LIVE
create_time_since_epoch: 1632968110211
last_update_time_since_epoch: 1632968110211
, artifact_type: id: 15
name: "Examples"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
properties {
  key: "version"
  value: INT
}
)]}, output_dict=defaultdict(<class 'list'>, {'statistics': [Artifact(artifact: uri: "./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/10"
custom_properties {
  key: "name"
  value {
    string_value: "my_pipeline:2021-09-30T02:15:09.210435:StatisticsGen:statistics:0"
  }
}
, artifact_type: name: "ExampleStatistics"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
)]}), exec_properties={'exclude_splits': '[]'}, execution_output_uri='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/10/executor_output.pb', stateful_working_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/stateful_working_dir/2021-09-30T02:15:09.210435', tmp_dir='./tfx_pipeline_output/my_pipeline/StatisticsGen/.system/executor_execution/10/.temp/', pipeline_node=node_info {
  type {
    name: "tfx.components.statistics_gen.component.StatisticsGen"
  }
  id: "StatisticsGen"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "my_pipeline"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2021-09-30T02:15:09.210435"
      }
    }
  }
  contexts {
    type {
      name: "node"
    }
    name {
      field_value {
        string_value: "my_pipeline.StatisticsGen"
      }
    }
  }
}
inputs {
  inputs {
    key: "examples"
    value {
      channels {
        producer_node_query {
          id: "CsvExampleGen"
        }
        context_queries {
          type {
            name: "pipeline"
          }
          name {
            field_value {
              string_value: "my_pipeline"
            }
          }
        }
        context_queries {
          type {
            name: "pipeline_run"
          }
          name {
            field_value {
              string_value: "2021-09-30T02:15:09.210435"
            }
          }
        }
        context_queries {
          type {
            name: "node"
          }
          name {
            field_value {
              string_value: "my_pipeline.CsvExampleGen"
            }
          }
        }
        artifact_query {
          type {
            name: "Examples"
          }
        }
        output_key: "examples"
      }
    }
  }
}
outputs {
  outputs {
    key: "statistics"
    value {
      artifact_spec {
        type {
          name: "ExampleStatistics"
          properties {
            key: "span"
            value: INT
          }
          properties {
            key: "split_names"
            value: STRING
          }
        }
      }
    }
  }
}
parameters {
  parameters {
    key: "exclude_splits"
    value {
      field_value {
        string_value: "[]"
      }
    }
  }
}
upstream_nodes: "CsvExampleGen"
downstream_nodes: "ExampleValidator"
downstream_nodes: "SchemaGen"
execution_options {
  caching_options {
  }
}
, pipeline_info=id: "my_pipeline"
, pipeline_run_id='2021-09-30T02:15:09.210435')
INFO:absl:Generating statistics for split train.
INFO:absl:Statistics for split train written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/10/Split-train.
INFO:absl:Generating statistics for split eval.
INFO:absl:Statistics for split eval written to ./tfx_pipeline_output/my_pipeline/StatisticsGen/statistics/10/Split-eval.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
INFO:absl:MetadataStore with DB connection initialized
ERROR:absl:Execution 10 failed.
INFO:absl:Cleaning up stateless execution info.
Traceback (most recent call last):
  File "local_runner.py", line 78, in <module>
    run()
  File "local_runner.py", line 73, in run
    .sqlite_metadata_connection_config(METADATA_PATH)))
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tfx/orchestration/local/local_dag_runner.py", line 91, in run
    component_launcher.launch()
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tfx/orchestration/portable/launcher.py", line 482, in launch
    executor_output = self._run_executor(execution_info)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tfx/orchestration/portable/launcher.py", line 358, in _run_executor
    executor_output = self._executor_operator.run_executor(execution_info)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tfx/orchestration/portable/beam_executor_operator.py", line 89, in run_executor
    return python_executor_operator.run_with_executor(execution_info, executor)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tfx/orchestration/portable/python_executor_operator.py", line 55, in run_with_executor
    execution_info.exec_properties)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tfx/components/statistics_gen/executor.py", line 142, in Do
    output_uri)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/apache_beam/pipeline.py", line 585, in __exit__
    self.result = self.run()
  File "/home/kbuilder/.local/lib/python3.7/site-packages/apache_beam/pipeline.py", line 564, in run
    return self.runner.run_pipeline(self, self._options)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py", line 131, in run_pipeline
    return runner.run_pipeline(pipeline, options)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 196, in run_pipeline
    pipeline.to_runner_api(default_environment=self._default_environment))
  File "/home/kbuilder/.local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 206, in run_via_runner_api
    return self.run_stages(stage_context, stages)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 385, in run_stages
    runner_execution_context, bundle_context_manager)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 653, in _run_stage
    bundle_manager))
  File "/home/kbuilder/.local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 770, in _run_bundle
    data_input, data_output, input_timers, expected_timer_output)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1080, in process_bundle
    result_future = self._worker_handler.control_conn.push(process_bundle_req)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", line 378, in push
    response = self.worker.do_instruction(request)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 602, in do_instruction
    getattr(request, request_type), request.instruction_id)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 639, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "/home/kbuilder/.local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1002, in process_bundle
    op.finish()
  File "apache_beam/runners/worker/operations.py", line 1071, in apache_beam.runners.worker.operations.PGBKCVOperation.finish
  File "apache_beam/runners/worker/operations.py", line 1074, in apache_beam.runners.worker.operations.PGBKCVOperation.finish
  File "apache_beam/runners/worker/operations.py", line 1089, in apache_beam.runners.worker.operations.PGBKCVOperation.output_key
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tensorflow_data_validation/statistics/stats_impl.py", line 757, in compact
    self._maybe_do_batch(accumulator, force=True)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tensorflow_data_validation/statistics/stats_impl.py", line 690, in _maybe_do_batch
    accumulator.partial_accumulators)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tensorflow_data_validation/statistics/stats_impl.py", line 649, in _for_each_generator
    self._generators, zip(*args))]
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tensorflow_data_validation/statistics/stats_impl.py", line 648, in <listcomp>
    return [func(gen, *args_for_func) for gen, args_for_func in zip(
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tensorflow_data_validation/statistics/stats_impl.py", line 689, in <lambda>
    lambda gen, gen_acc: gen.add_input(gen_acc, record_batch),
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tensorflow_data_validation/statistics/generators/basic_stats_generator.py", line 1026, in add_input
    weights)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tensorflow_data_validation/statistics/generators/basic_stats_generator.py", line 232, in update
    weights)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tensorflow_data_validation/statistics/generators/basic_stats_generator.py", line 128, in update
    num_values_grouped = pa.array(num_values_not_none).value_counts()
  File "pyarrow/array.pxi", line 292, in pyarrow.lib.array
  File "pyarrow/array.pxi", line 79, in pyarrow.lib._ndarray_to_array
  File "pyarrow/array.pxi", line 67, in pyarrow.lib._ndarray_to_type
  File "pyarrow/error.pxi", line 107, in pyarrow.lib.check_status
pyarrow.lib.ArrowTypeError: Did not pass numpy.dtype object
Error while running "/usr/bin/python3.7 local_runner.py"

Vous devriez pouvoir trouver votre nouveau modèle à SERVING_MODEL_DIR .

Étape 6. (Facultatif) Déployez votre pipeline sur Kubeflow Pipelines sur GCP.

Comme mentionné précédemment, local_runner.py est bon pour le débogage ou le but de développement , mais pas une meilleure solution pour les charges de travail de production. Au cours de cette étape, nous allons déployer le pipeline sur Kubeflow Pipelines sur Google Cloud.

Préparation

Nous avons besoin kfp paquet python et skaffold programme pour déployer un pipeline à un cluster Kubeflow Pipelines.

import sys
!{sys.executable} -m pip install --upgrade -q kfp==1.0.0

# Download skaffold and set it executable.
!curl -Lo skaffold https://storage.googleapis.com/skaffold/releases/latest/skaffold-linux-amd64 && chmod +x skaffold

Vous devez déplacer skaffold binaire à l'endroit où votre shell peut trouver. Vous pouvez également spécifier le chemin d'accès skaffold lorsque vous exécutez tfx binaire avec --skaffold-cmd drapeau.

# Move skaffold binary into your path
mv skaffold /home/jupyter/.local/bin/
mv: cannot move 'skaffold' to '/home/jupyter/.local/bin/': No such file or directory

Vous avez également besoin d'un cluster Kubeflow Pipelines pour exécuter le pipeline. S'il vous plaît suivez les étapes 1 et 2 dans le TFX didacticiel AI - Cloud Platform Pipelines .

Lorsque votre cluster est prêt, ouvrez le tableau de bord de pipeline en cliquant sur Ouvrir Dashboard Pipelines dans la Pipelines page de la console nuage Google . L'URL de cette page est ENDPOINT pour demander une course de pipeline. La valeur du point de terminaison correspond à tout ce qui se trouve dans l'URL après https://, jusqu'à googleusercontent.com inclus. Placez votre point de terminaison dans le bloc de code suivant.

ENDPOINT='' # Enter your ENDPOINT here.

Pour exécuter notre code dans un cluster Kubeflow Pipelines, nous devons compresser notre code dans une image de conteneur. L'image sera générée automatiquement lors du déploiement de notre pipeline, et il vous suffit de définir un nom et un registre de conteneurs pour votre image. Dans notre exemple, nous utiliserons registre Google Container et nommez - le tfx-pipeline .

# Read GCP project id from env.
shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
GOOGLE_CLOUD_PROJECT=shell_output[0]

# Docker image name for the pipeline image.
CUSTOM_TFX_IMAGE='gcr.io/' + GOOGLE_CLOUD_PROJECT + '/tfx-pipeline'

Définir l'emplacement des données.

Vos données doivent être accessibles depuis le cluster Kubeflow Pipelines. Si vous avez utilisé des données dans votre environnement local, vous devrez peut-être les télécharger sur un stockage distant tel que Google Cloud Storage. Par exemple, nous pouvons télécharger des données de pingouin dans un bucket par défaut qui est créé automatiquement lorsqu'un cluster Kubeflow Pipelines est déployé comme suit.

gsutil cp data/data.csv gs://{GOOGLE_CLOUD_PROJECT}-kubeflowpipelines-default/tfx-template/data/penguin/
Copying file://data/data.csv [Content-Type=text/csv]...
NotFoundException: 404 The destination bucket gs://tf-benchmark-dashboard-kubeflowpipelines-default does not exist or the write to the destination must be restarted

Mettre à jour l'emplacement des données stockées à DATA_PATH dans kubeflow_runner.py .

Si vous utilisez BigQueryExampleGen, il n'y a pas besoin de télécharger le fichier de données, mais s'il vous plaît assurez - vous que kubeflow_runner.py utilise la même query et beam_pipeline_args argument en faveur de pipeline.create_pipeline() fonction.

Déployez le pipeline.

Si tout est prêt, vous pouvez créer un pipeline à l' aide de tfx pipeline create commande.

!tfx pipeline create  \
--engine=kubeflow \
--pipeline-path=kubeflow_runner.py \
--endpoint={ENDPOINT} \
--build-target-image={CUSTOM_TFX_IMAGE}
CLI
[Error] --build-target-image flag was DELETED. You should specify the build target image at the `KubeflowDagRunnerConfig` class instead, and use --build-image flag without argument to build a container image when creating or updating a pipeline.

Maintenant commencer une exécution courir avec le pipeline nouvellement créé à l' aide de la tfx run create commande.

tfx run create --engine=kubeflow --pipeline-name={PIPELINE_NAME} --endpoint={ENDPOINT}
CLI
Creating a run for pipeline: my_pipeline
Failed to load kube config.
Traceback (most recent call last):
  File "/home/kbuilder/.local/lib/python3.7/site-packages/urllib3/connection.py", line 175, in _new_conn
    (self._dns_host, self.port), self.timeout, **extra_kw
  File "/home/kbuilder/.local/lib/python3.7/site-packages/urllib3/util/connection.py", line 96, in create_connection
    raise err
  File "/home/kbuilder/.local/lib/python3.7/site-packages/urllib3/util/connection.py", line 86, in create_connection
    sock.connect(sa)
ConnectionRefusedError: [Errno 111] Connection refused

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/kbuilder/.local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 706, in urlopen
    chunked=chunked,
  File "/home/kbuilder/.local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 394, in _make_request
    conn.request(method, url, **httplib_request_kw)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/urllib3/connection.py", line 239, in request
    super(HTTPConnection, self).request(method, url, body=body, headers=headers)
  File "/usr/lib/python3.7/http/client.py", line 1252, in request
    self._send_request(method, url, body, headers, encode_chunked)
  File "/usr/lib/python3.7/http/client.py", line 1298, in _send_request
    self.endheaders(body, encode_chunked=encode_chunked)
  File "/usr/lib/python3.7/http/client.py", line 1247, in endheaders
    self._send_output(message_body, encode_chunked=encode_chunked)
  File "/usr/lib/python3.7/http/client.py", line 1026, in _send_output
    self.send(msg)
  File "/usr/lib/python3.7/http/client.py", line 966, in send
    self.connect()
  File "/home/kbuilder/.local/lib/python3.7/site-packages/urllib3/connection.py", line 205, in connect
    conn = self._new_conn()
  File "/home/kbuilder/.local/lib/python3.7/site-packages/urllib3/connection.py", line 187, in _new_conn
    self, "Failed to establish a new connection: %s" % e
urllib3.exceptions.NewConnectionError: <urllib3.connection.HTTPConnection object at 0x7fa2b0975190>: Failed to establish a new connection: [Errno 111] Connection refused

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/kbuilder/.local/bin/tfx", line 8, in <module>
    sys.exit(cli_group())
  File "/home/kbuilder/.local/lib/python3.7/site-packages/click/core.py", line 829, in __call__
    return self.main(*args, **kwargs)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/click/core.py", line 782, in main
    rv = self.invoke(ctx)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/home/kbuilder/.local/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/home/kbuilder/.local/lib/python3.7/site-packages/click/core.py", line 1066, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/click/decorators.py", line 73, in new_func
    return ctx.invoke(f, obj, *args, **kwargs)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tfx/tools/cli/commands/run.py", line 87, in create_run
    handler_factory.create_handler(ctx.flags_dict).create_run()
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tfx/tools/cli/handler/handler_factory.py", line 98, in create_handler
    return kubeflow_handler.KubeflowHandler(flags_dict)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/tfx/tools/cli/handler/kubeflow_handler.py", line 62, in __init__
    namespace=self.flags_dict[labels.NAMESPACE])
  File "/home/kbuilder/.local/lib/python3.7/site-packages/kfp/_client.py", line 196, in __init__
    if not self._context_setting['namespace'] and self.get_kfp_healthz(
  File "/home/kbuilder/.local/lib/python3.7/site-packages/kfp/_client.py", line 410, in get_kfp_healthz
    response = self._healthz_api.get_healthz()
  File "/home/kbuilder/.local/lib/python3.7/site-packages/kfp_server_api/api/healthz_service_api.py", line 63, in get_healthz
    return self.get_healthz_with_http_info(**kwargs)  # noqa: E501
  File "/home/kbuilder/.local/lib/python3.7/site-packages/kfp_server_api/api/healthz_service_api.py", line 148, in get_healthz_with_http_info
    collection_formats=collection_formats)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/kfp_server_api/api_client.py", line 369, in call_api
    _preload_content, _request_timeout, _host)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/kfp_server_api/api_client.py", line 185, in __call_api
    _request_timeout=_request_timeout)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/kfp_server_api/api_client.py", line 393, in request
    headers=headers)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/kfp_server_api/rest.py", line 234, in GET
    query_params=query_params)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/kfp_server_api/rest.py", line 212, in request
    headers=headers)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/urllib3/request.py", line 75, in request
    method, url, fields=fields, headers=headers, **urlopen_kw
  File "/home/kbuilder/.local/lib/python3.7/site-packages/urllib3/request.py", line 96, in request_encode_url
    return self.urlopen(method, url, **extra_kw)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/urllib3/poolmanager.py", line 375, in urlopen
    response = conn.urlopen(method, u.request_uri, **kw)
  File "/home/kbuilder/.local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 796, in urlopen
    **response_kw
  File "/home/kbuilder/.local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 796, in urlopen
    **response_kw
  File "/home/kbuilder/.local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 796, in urlopen
    **response_kw
  File "/home/kbuilder/.local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 756, in urlopen
    method, url, error=e, _pool=self, _stacktrace=sys.exc_info()[2]
  File "/home/kbuilder/.local/lib/python3.7/site-packages/urllib3/util/retry.py", line 574, in increment
    raise MaxRetryError(_pool, url, error or ResponseError(cause))
urllib3.exceptions.MaxRetryError: HTTPConnectionPool(host='localhost', port=80): Max retries exceeded with url: /apis/v1beta1/healthz (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7fa2b0975190>: Failed to establish a new connection: [Errno 111] Connection refused'))

Vous pouvez également exécuter le pipeline dans le tableau de bord Kubeflow Pipelines. La nouvelle piste sera répertorié sous Experiments dans le tableau de bord Pipelines Kubeflow. Cliquer sur l'expérience vous permettra de surveiller la progression et de visualiser les artefacts créés pendant l'exécution.

Si vous êtes intéressé à diriger votre pipeline sur Kubeflow Pipelines, trouver plus d' instructions dans TFX sur tutoriel AI - Cloud Platform Pipelines .

Nettoyer

Pour nettoyer toutes les ressources Google Cloud utilisées dans cette étape, vous pouvez supprimer le projet Google Cloud vous avez utilisé pour le tutoriel.

Alternativement, vous pouvez nettoyer des ressources individuelles en visitant chaque console :