![]() | ![]() | ![]() |
ペンギンを分類するために本番MLパイプラインを設定するシナリオを想定します。パイプラインは、トレーニングデータを取り込み、モデルをトレーニングして評価し、本番環境にプッシュします。
ただし、後でさまざまな種類のペンギンを含むより大きなデータセットでこのモデルを使用しようとすると、モデルが期待どおりに動作せず、種の分類が正しく開始されないことがわかります。
この時点で、あなたは知ることに興味があります:
- 利用可能なアーティファクトが本番環境のモデルのみである場合、モデルをデバッグするための最も効率的な方法は何ですか?
- モデルのトレーニングに使用されたトレーニングデータセットはどれですか?
- どのトレーニング実行がこの誤ったモデルにつながったのですか?
- モデルの評価結果はどこにありますか?
- どこからデバッグを開始しますか?
MLメタデータ(MLMD)は、MLモデルに関連付けられたメタデータを活用して、これらの質問などに答えるのに役立つライブラリです。有用な例えは、このメタデータをソフトウェア開発へのログインと同等であると考えることです。 MLMDを使用すると、MLパイプラインのさまざまなコンポーネントに関連するアーティファクトと系統を確実に追跡できます。
このチュートリアルでは、TFXパイプラインを設定して、ペンギンを体重、山頂の長さと深さ、足ひれの長さに基づいて3つの種に分類するモデルを作成します。次に、MLMDを使用して、パイプラインコンポーネントの系統を追跡します。
ColabのTFXパイプライン
Colabは、本番環境とは大きく異なる軽量の開発環境です。本番環境では、データの取り込み、変換、モデルトレーニング、実行履歴など、複数の分散システムにまたがるさまざまなパイプラインコンポーネントが存在する場合があります。このチュートリアルでは、オーケストレーションとメタデータストレージに大きな違いがあることに注意してください。これらはすべてColab内でローカルに処理されます。 ColabのTFXについて詳しくは、こちらをご覧ください。
セットアップ
まず、必要なパッケージをインストールしてインポートし、パスを設定して、データをダウンロードします。
アップグレードピップ
ローカルで実行しているときにシステムでPipをアップグレードしないようにするには、Colabで実行していることを確認してください。もちろん、ローカルシステムは個別にアップグレードできます。
try:
import colab
!pip install --upgrade pip
except:
pass
TFXをインストールしてインポートする
pip install -q -U --use-feature=2020-resolver tfx
TFXをインストールした後、Colabランタイムを再起動する必要があります。 ColabメニューからRuntime> Restartruntimeを選択します。これは、Colabがパッケージをロードする方法が原因です。
最初にランタイムを再起動せずに、このチュートリアルの残りの部分に進まないでください。
他のライブラリをインポートする
import os
import tempfile
import urllib
import pandas as pd
TFXコンポーネントクラスをインポートします。
from tfx.components.evaluator.component import Evaluator
from tfx.components.example_gen.csv_example_gen.component import CsvExampleGen
from tfx.components.pusher.component import Pusher
from tfx.components.schema_gen.component import SchemaGen
from tfx.components.statistics_gen.component import StatisticsGen
from tfx.components.trainer.component import Trainer
from tfx.components.base import executor_spec
from tfx.components.trainer.executor import GenericExecutor
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext
from tfx.proto import evaluator_pb2
from tfx.proto import pusher_pb2
from tfx.proto import trainer_pb2
from tfx.utils.dsl_utils import external_input
from tensorflow_metadata.proto.v0 import anomalies_pb2
from tensorflow_metadata.proto.v0 import schema_pb2
from tensorflow_metadata.proto.v0 import statistics_pb2
import tensorflow_model_analysis as tfma
from tfx.components import ResolverNode
from tfx.dsl.experimental import latest_blessed_model_resolver
from tfx.types import Channel
from tfx.types.standard_artifacts import Model
from tfx.types.standard_artifacts import ModelBlessing
MLMDライブラリをインポートします。
import ml_metadata as mlmd
from ml_metadata.proto import metadata_store_pb2
データセットをダウンロードする
このコラボでは、 GithubにあるPalmerPenguisデータセットを使用します。不完全なレコードを除外してデータセットを処理し、 island
とsex
列を削除し、ラベルをint32
変換しました。データセットには、ペンギスの山頂の体重と長さおよび深さ、およびそれらの足ひれの長さの334レコードが含まれています。このデータを使用して、ペンギを3つの種のいずれかに分類します。
DATA_PATH = 'https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/penguin/data/penguins_processed.csv'
_data_root = tempfile.mkdtemp(prefix='tfx-data')
_data_filepath = os.path.join(_data_root, "penguins_processed.csv")
urllib.request.urlretrieve(DATA_PATH, _data_filepath)
('/tmp/tfx-data49x991k9/penguins_processed.csv', <http.client.HTTPMessage at 0x7fd426207898>)
InteractiveContextを作成する
このノートブックでTFXコンポーネントをインタラクティブに実行するには、 InteractiveContext
作成します。 InteractiveContext
は、一時的なMLMDデータベースインスタンスを持つ一時ディレクトリを使用します。 InteractiveContext
呼び出しは、Colab環境外では何も実行されないことに注意してください。
一般に、 Context
下で同様のパイプライン実行をグループ化することをおContext
ます。
interactive_context = InteractiveContext()
WARNING:absl:InteractiveContext pipeline_root argument not provided: using temporary directory /tmp/tfx-interactive-2020-11-26T10_11_55.747278-71_skpdw as root for pipeline outputs. WARNING:absl:InteractiveContext metadata_connection_config not provided: using SQLite ML Metadata database at /tmp/tfx-interactive-2020-11-26T10_11_55.747278-71_skpdw/metadata.sqlite.
TFXパイプラインを構築する
TFXパイプラインは、MLワークフローのさまざまな側面を実行するいくつかのコンポーネントで構成されています。このノートブックでは、 ExampleGen
、 StatisticsGen
、 SchemaGen
、およびTrainer
コンポーネントを作成して実行し、 Evaluator
およびPusher
コンポーネントを使用して、トレーニング済みモデルを評価およびプッシュします。
TFXパイプラインコンポーネントの詳細については、コンポーネントのチュートリアルを参照してください。
ExampleGenコンポーネントをインスタンス化して実行します
input_data = external_input(_data_root)
example_gen = CsvExampleGen(input=input_data)
interactive_context.run(example_gen)
WARNING:tensorflow:From <ipython-input-1-e4ef519b0b92>:1: external_input (from tfx.utils.dsl_utils) is deprecated and will be removed in a future version. Instructions for updating: external_input is deprecated, directly pass the uri to ExampleGen. Warning:tensorflow:From <ipython-input-1-e4ef519b0b92>:1: external_input (from tfx.utils.dsl_utils) is deprecated and will be removed in a future version. Instructions for updating: external_input is deprecated, directly pass the uri to ExampleGen. WARNING:absl:The "input" argument to the CsvExampleGen component has been deprecated by "input_base". Please update your usage as support for this argument will be removed soon. WARNING:apache_beam.runners.interactive.interactive_environment:Dependencies required for Interactive Beam PCollection visualization are not available, please use: `pip install apache-beam[interactive]` to install necessary dependencies to enable all data visualization features. Warning:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be.
StatisticsGenコンポーネントをインスタンス化して実行します
statistics_gen = StatisticsGen(examples=example_gen.outputs['examples'])
interactive_context.run(statistics_gen)
SchemaGenコンポーネントをインスタンス化して実行します
infer_schema = SchemaGen(
statistics=statistics_gen.outputs['statistics'], infer_feature_shape=True)
interactive_context.run(infer_schema)
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow_data_validation/utils/stats_util.py:247: tf_record_iterator (from tensorflow.python.lib.io.tf_record) is deprecated and will be removed in a future version. Instructions for updating: Use eager execution and: `tf.data.TFRecordDataset(path)` Warning:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow_data_validation/utils/stats_util.py:247: tf_record_iterator (from tensorflow.python.lib.io.tf_record) is deprecated and will be removed in a future version. Instructions for updating: Use eager execution and: `tf.data.TFRecordDataset(path)`
トレーナーコンポーネントをインスタンス化して実行します
# Define the module file for the Trainer component
trainer_module_file = 'penguin_trainer.py'
%%writefile {trainer_module_file}
# Define the training algorithm for the Trainer module file
import os
from typing import List, Text
import tensorflow as tf
from tensorflow import keras
from tfx.components.trainer.executor import TrainerFnArgs
from tfx.components.trainer.fn_args_utils import FnArgs
# Features used for classification - culmen length and depth, flipper length,
# body mass, and species.
_FEATURES = {
'culmen_length_mm': tf.io.FixedLenFeature([], dtype=tf.float32),
'culmen_depth_mm': tf.io.FixedLenFeature([], dtype=tf.float32),
'flipper_length_mm': tf.io.FixedLenFeature([], dtype=tf.float32),
'body_mass_g': tf.io.FixedLenFeature([], dtype=tf.float32),
'species': tf.io.FixedLenFeature([], dtype=tf.int64)
}
_LABEL_KEY = 'species'
_FEATURE_KEYS = [
'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
]
def _gzip_reader_fn(filenames):
return tf.data.TFRecordDataset(filenames, compression_type='GZIP')
def _input_fn(file_pattern: List[Text], batch_size: int = 200):
dataset = tf.data.experimental.make_batched_features_dataset(
file_pattern=file_pattern,
batch_size=batch_size,
features=_FEATURES,
reader=_gzip_reader_fn,
label_key=_LABEL_KEY)
return dataset
def _build_keras_model():
inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
d = keras.layers.concatenate(inputs)
d = keras.layers.Dense(8, activation='relu')(d)
d = keras.layers.Dense(8, activation='relu')(d)
outputs = keras.layers.Dense(3, activation='softmax')(d)
model = keras.Model(inputs=inputs, outputs=outputs)
model.compile(
optimizer='adam',
loss='sparse_categorical_crossentropy',
metrics=[keras.metrics.SparseCategoricalAccuracy()])
return model
def run_fn(fn_args: TrainerFnArgs):
train_dataset = _input_fn(fn_args.train_files, batch_size=10)
eval_dataset = _input_fn(fn_args.eval_files, batch_size=10)
model = _build_keras_model()
model.fit(
train_dataset,
epochs=int(fn_args.train_steps / 20),
steps_per_epoch=20,
validation_data=eval_dataset,
validation_steps=fn_args.eval_steps)
model.save(fn_args.serving_model_dir, save_format='tf')
Writing penguin_trainer.py
Trainer
コンポーネントを実行します。
trainer = Trainer(
module_file=os.path.abspath(trainer_module_file),
custom_executor_spec=executor_spec.ExecutorClassSpec(GenericExecutor),
examples=example_gen.outputs['examples'],
schema=infer_schema.outputs['schema'],
train_args=trainer_pb2.TrainArgs(num_steps=100),
eval_args=trainer_pb2.EvalArgs(num_steps=50))
interactive_context.run(trainer)
WARNING:tensorflow:From <ipython-input-1-d2af3312f9a2>:3: The name tfx.components.base.executor_spec.ExecutorClassSpec is deprecated. Please use tfx.dsl.components.base.executor_spec.ExecutorClassSpec instead. Warning:tensorflow:From <ipython-input-1-d2af3312f9a2>:3: The name tfx.components.base.executor_spec.ExecutorClassSpec is deprecated. Please use tfx.dsl.components.base.executor_spec.ExecutorClassSpec instead. Epoch 1/5 20/20 [==============================] - 0s 11ms/step - loss: 0.9556 - sparse_categorical_accuracy: 0.3850 - val_loss: 0.9785 - val_sparse_categorical_accuracy: 0.3100 Epoch 2/5 20/20 [==============================] - 0s 6ms/step - loss: 0.8855 - sparse_categorical_accuracy: 0.4150 - val_loss: 0.9441 - val_sparse_categorical_accuracy: 0.3300 Epoch 3/5 20/20 [==============================] - 0s 6ms/step - loss: 0.8429 - sparse_categorical_accuracy: 0.4550 - val_loss: 0.9160 - val_sparse_categorical_accuracy: 0.3800 Epoch 4/5 20/20 [==============================] - 0s 6ms/step - loss: 0.9022 - sparse_categorical_accuracy: 0.4700 - val_loss: 0.8866 - val_sparse_categorical_accuracy: 0.6000 Epoch 5/5 20/20 [==============================] - 0s 6ms/step - loss: 0.7675 - sparse_categorical_accuracy: 0.7000 - val_loss: 0.8630 - val_sparse_categorical_accuracy: 0.6400 WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/training/tracking/tracking.py:111: Model.state_updates (from tensorflow.python.keras.engine.training) is deprecated and will be removed in a future version. Instructions for updating: This property should not be used in TensorFlow 2.0, as updates are applied automatically. Warning:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/training/tracking/tracking.py:111: Model.state_updates (from tensorflow.python.keras.engine.training) is deprecated and will be removed in a future version. Instructions for updating: This property should not be used in TensorFlow 2.0, as updates are applied automatically. Warning:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/training/tracking/tracking.py:111: Layer.updates (from tensorflow.python.keras.engine.base_layer) is deprecated and will be removed in a future version. Instructions for updating: This property should not be used in TensorFlow 2.0, as updates are applied automatically. Warning:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/training/tracking/tracking.py:111: Layer.updates (from tensorflow.python.keras.engine.base_layer) is deprecated and will be removed in a future version. Instructions for updating: This property should not be used in TensorFlow 2.0, as updates are applied automatically. INFO:tensorflow:Assets written to: /tmp/tfx-interactive-2020-11-26T10_11_55.747278-71_skpdw/Trainer/model/4/serving_model_dir/assets INFO:tensorflow:Assets written to: /tmp/tfx-interactive-2020-11-26T10_11_55.747278-71_skpdw/Trainer/model/4/serving_model_dir/assets
モデルを評価してプッシュする
Pusher
コンポーネントを使用してモデルをサービングディレクトリにプッシュする前に、 Evaluator
コンポーネントを使用してモデルを評価し、「祝福」します。
_serving_model_dir = os.path.join(tempfile.mkdtemp(),
'serving_model/penguins_classification')
eval_config = tfma.EvalConfig(
model_specs=[
tfma.ModelSpec(label_key='species', signature_name='serving_default')
],
metrics_specs=[
tfma.MetricsSpec(metrics=[
tfma.MetricConfig(
class_name='SparseCategoricalAccuracy',
threshold=tfma.MetricThreshold(
value_threshold=tfma.GenericValueThreshold(
lower_bound={'value': 0.6})))
])
],
slicing_specs=[tfma.SlicingSpec()])
evaluator = Evaluator(
examples=example_gen.outputs['examples'],
model=trainer.outputs['model'],
schema=infer_schema.outputs['schema'],
eval_config=eval_config)
interactive_context.run(evaluator)
WARNING:absl:"maybe_add_baseline" and "maybe_remove_baseline" are deprecated, please use "has_baseline" instead.
pusher = Pusher(
model=trainer.outputs['model'],
model_blessing=evaluator.outputs['blessing'],
push_destination=pusher_pb2.PushDestination(
filesystem=pusher_pb2.PushDestination.Filesystem(
base_directory=_serving_model_dir)))
interactive_context.run(pusher)
TFXパイプラインを実行すると、MLMDデータベースにデータが入力されます。次のセクションでは、MLMD APIを使用して、このデータベースにメタデータ情報を照会します。
MLMDデータベースを照会する
MLMDデータベースには、次の3種類のメタデータが格納されています。
- パイプラインに関するメタデータとパイプラインコンポーネントに関連付けられた系統情報
- パイプラインの実行中に生成されたアーティファクトに関するメタデータ
- パイプラインの実行に関するメタデータ
一般的な本番環境のパイプラインは、新しいデータが到着すると複数のモデルに対応します。提供されたモデルで誤った結果が発生した場合は、MLMDデータベースにクエリを実行して、誤ったモデルを特定できます。次に、これらのモデルに対応するパイプラインコンポーネントの系統をトレースして、モデルをデバッグできます。
MLMDデータベースにクエリを実行するために、前に定義したInteractiveContext
を使用してメタデータ(MD)ストアを設定します。
connection_config = interactive_context.metadata_connection_config
store = mlmd.MetadataStore(connection_config)
# All TFX artifacts are stored in the base directory
base_dir = connection_config.sqlite.filename_uri.split('metadata.sqlite')[0]
MDストアからのデータを表示するためのいくつかのヘルパー関数を作成します。
def display_types(types):
# Helper function to render dataframes for the artifact and execution types
table = {'id': [], 'name': []}
for a_type in types:
table['id'].append(a_type.id)
table['name'].append(a_type.name)
return pd.DataFrame(data=table)
def display_artifacts(store, artifacts):
# Helper function to render dataframes for the input artifacts
table = {'artifact id': [], 'type': [], 'uri': []}
for a in artifacts:
table['artifact id'].append(a.id)
artifact_type = store.get_artifact_types_by_id([a.type_id])[0]
table['type'].append(artifact_type.name)
table['uri'].append(a.uri.replace(base_dir, './'))
return pd.DataFrame(data=table)
def display_properties(store, node):
# Helper function to render dataframes for artifact and execution properties
table = {'property': [], 'value': []}
for k, v in node.properties.items():
table['property'].append(k)
table['value'].append(
v.string_value if v.HasField('string_value') else v.int_value)
for k, v in node.custom_properties.items():
table['property'].append(k)
table['value'].append(
v.string_value if v.HasField('string_value') else v.int_value)
return pd.DataFrame(data=table)
まず、MDストアに、保存されているすべてのArtifactTypes
リストを照会します。
display_types(store.get_artifact_types())
次に、すべてのPushedModel
アーティファクトをクエリします。
pushed_models = store.get_artifacts_by_type("PushedModel")
display_artifacts(store, pushed_models)
MDストアに最新のプッシュモデルを照会します。このチュートリアルには、プッシュされたモデルが1つだけあります。
pushed_model = pushed_models[-1]
display_properties(store, pushed_model)
プッシュされたモデルをデバッグする最初のステップの1つは、プッシュされたトレーニング済みモデルを確認し、そのモデルのトレーニングに使用されているトレーニングデータを確認することです。
MLMDは、モデルの来歴を分析するために使用できる来歴グラフをウォークスルーするためのトラバーサルAPIを提供します。
def get_one_hop_parent_artifacts(store, artifacts):
# Get a list of artifacts within a 1-hop of the artifacts of interest
artifact_ids = [artifact.id for artifact in artifacts]
executions_ids = set(
event.execution_id
for event in store.get_events_by_artifact_ids(artifact_ids)
if event.type == metadata_store_pb2.Event.OUTPUT)
artifacts_ids = set(
event.artifact_id
for event in store.get_events_by_execution_ids(executions_ids)
if event.type == metadata_store_pb2.Event.INPUT)
return [artifact for artifact in store.get_artifacts_by_id(artifacts_ids)]
プッシュされたモデルの親アーティファクトをクエリします。
parent_artifacts = get_one_hop_parent_artifacts(store, [pushed_model])
display_artifacts(store, parent_artifacts)
モデルのプロパティをクエリします。
exported_model = parent_artifacts[0]
display_properties(store, exported_model)
モデルのアップストリームアーティファクトをクエリします。
model_parents = get_one_hop_parent_artifacts(store, [exported_model])
display_artifacts(store, model_parents)
モデルがトレーニングされたトレーニングデータを取得します。
used_data = model_parents[0]
display_properties(store, used_data)
モデルがトレーニングしたトレーニングデータができたので、データベースに再度クエリを実行して、トレーニングステップ(実行)を見つけます。登録されている実行タイプのリストについて、MDストアにクエリを実行します。
display_types(store.get_execution_types())
トレーニングステップがあるExecutionType
という名前のtfx.components.trainer.component.Trainer
。 MDストアをトラバースして、プッシュされたモデルに対応するトレーナーを実行します。
def find_producer_execution(store, artifact):
executions_ids = set(
event.execution_id
for event in store.get_events_by_artifact_ids([artifact.id])
if event.type == metadata_store_pb2.Event.OUTPUT)
return store.get_executions_by_id(executions_ids)[0]
trainer = find_producer_execution(store, exported_model)
display_properties(store, trainer)
概要
このチュートリアルでは、MLMDを活用してTFXパイプラインコンポーネントの系統を追跡し、問題を解決する方法について学習しました。
MLMDの使用方法の詳細については、次の追加リソースを確認してください。