MLコミュニティデーは11月9日です! TensorFlow、JAXからの更新のために私たちに参加し、より多くの詳細をご覧ください

TFXPython関数コンポーネントチュートリアル

このノートブックには、TFXInteractiveContext内およびローカルで調整されたTFXパイプラインでPython関数コンポーネントを作成および実行する方法の例が含まれています。

より多くのコンテキスト情報については、カスタムPythonの機能コンポーネントのTFXのドキュメントサイトのページを。

設定

最初にTFXをインストールし、必要なモジュールをインポートします。 TFXにはPython3が必要です。

システムのPythonバージョンを確認してください

import sys
sys.version
'3.7.5 (default, Feb 23 2021, 13:22:40) \n[GCC 8.4.0]'

アップグレードピップ

ローカルで実行しているときにシステムでPipをアップグレードしないようにするには、Colabで実行していることを確認してください。もちろん、ローカルシステムは個別にアップグレードできます。

try:
  import colab
  !pip install --upgrade pip
except:
  pass

TFXをインストールする

pip install -U tfx

ランタイムを再起動しましたか?

上記のセルを初めて実行するときにGoogleColabを使用している場合は、ランタイムを再起動する必要があります([ランタイム]> [ランタイムの再起動...])。これは、Colabがパッケージをロードする方法が原因です。

パッケージをインポートする

TFXをインポートしてバージョンを確認します。

# Check version
from tfx import v1 as tfx
tfx.__version__
'1.2.0'

カスタムPython関数コンポーネント

このセクションでは、Python関数からコンポーネントを作成します。実際のMLの問題は発生しません。これらの単純な関数は、Python関数コンポーネントの開発プロセスを説明するために使用されています。

参照してくださいPython関数ベースのコンポーネントのガイドをよりドキュメントについて。

Pythonカスタムコンポーネントを作成する

まず、ダミーデータを生成する関数を作成します。これは、独自のPythonモジュールファイルに書き込まれます。

%%writefile my_generator.py

import os
import tensorflow as tf  # Used for writing files.

from tfx import v1 as tfx

# Non-public APIs, just for showcase.
from tfx.types.experimental.simple_artifacts import Dataset

@tfx.dsl.components.component
def MyGenerator(data: tfx.dsl.components.OutputArtifact[Dataset]):
  """Create a file with dummy data in the output artifact."""
  with tf.io.gfile.GFile(os.path.join(data.uri, 'data_file.txt'), 'w') as f:
    f.write('Dummy data')

  # Set metadata and ensure that it gets passed to downstream components.
  data.set_string_custom_property('my_custom_field', 'my_custom_value')
Writing my_generator.py

次に、生成されたダミーデータを使用する2番目のコンポーネントを記述します。データのハッシュを計算して返すだけです。

%%writefile my_consumer.py

import hashlib
import os
import tensorflow as tf

from tfx import v1 as tfx

# Non-public APIs, just for showcase.
from tfx.types.experimental.simple_artifacts import Dataset
from tfx.types.standard_artifacts import String

@tfx.dsl.components.component
def MyConsumer(data: tfx.dsl.components.InputArtifact[Dataset],
               hash: tfx.dsl.components.OutputArtifact[String],
               algorithm: tfx.dsl.components.Parameter[str] = 'sha256'):
  """Reads the contents of data and calculate."""
  with tf.io.gfile.GFile(
      os.path.join(data.uri, 'data_file.txt'), 'r') as f:
    contents = f.read()
  h = hashlib.new(algorithm)
  h.update(tf.compat.as_bytes(contents))
  hash.value = h.hexdigest()

  # Read a custom property from the input artifact and set to the output.
  custom_value = data.get_string_custom_property('my_custom_field')
  hash.set_string_custom_property('input_custom_field', custom_value)
Writing my_consumer.py

InteractiveContextを使用してノートブックで実行する

次に、TFXInteractiveContextでの新しいコンポーネントの使用法を示します。

あなたはTFXノートInteractiveContextで何ができるかの詳細については、中・ノートを参照TFX Kerasコンポーネントのチュートリアルを

from my_generator import MyGenerator
from my_consumer import MyConsumer

InteractiveContextを構築する

# Here, we create an InteractiveContext using default parameters. This will
# use a temporary directory with an ephemeral ML Metadata database instance.
# To use your own pipeline root or database, the optional properties
# `pipeline_root` and `metadata_connection_config` may be passed to
# InteractiveContext. Calls to InteractiveContext are no-ops outside of the
# notebook.
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext
context = InteractiveContext()
WARNING:absl:InteractiveContext pipeline_root argument not provided: using temporary directory /tmp/tfx-interactive-2021-09-30T02_05_44.574837-wib0wzgy as root for pipeline outputs.
WARNING:absl:InteractiveContext metadata_connection_config not provided: using SQLite ML Metadata database at /tmp/tfx-interactive-2021-09-30T02_05_44.574837-wib0wzgy/metadata.sqlite.

対話形式のコンポーネントを実行しますcontext.run()

次に、我々がノートブックの中に対話的に私たちのコンポーネントを実行context.run()私たちのコンシューマーコンポーネントは、ジェネレーターコンポーネントの出力を使用します。

generator = MyGenerator()
context.run(generator)
WARNING: Logging before InitGoogleLogging() is written to STDERR
I0930 02:05:44.621371 16837 rdbms_metadata_access_object.cc:686] No property is defined for the Type
consumer = MyConsumer(
    data=generator.outputs['data'],
    algorithm='md5')
context.run(consumer)
I0930 02:05:44.660826 16837 rdbms_metadata_access_object.cc:686] No property is defined for the Type

実行後、ディスク上のコンシューマコンポーネントの「ハッシュ」出力アーティファクトの内容を検査できます。

tail -v {consumer.outputs['hash'].get()[0].uri}
==> /tmp/tfx-interactive-2021-09-30T02_05_44.574837-wib0wzgy/MyConsumer/hash/2/value <==
0015fe7975d1a2794b59aa12635703f1

これで、独自のカスタムコンポーネントを作成して実行できました。

パイプライン定義を書く

次に、これらの同じコンポーネントを使用してパイプラインを作成します。使用している間InteractiveContextノートブック内は実験のためにうまく機能し、パイプラインを定義することで、生産使用のために、ローカルまたはリモートのランナーにあなたのパイプラインを展開することができます。

ここでは、マシン上でローカルに実行されているLocalDagRunnerの使用法を示します。本番環境での実行には、AirflowまたはKubeflowランナーの方が適している場合があります。

パイプラインを構築する

import os
import tempfile
from tfx import v1 as tfx

# Select a persistent TFX root directory to store your output artifacts.
# For demonstration purposes only, we use a temporary directory.
PIPELINE_ROOT = tempfile.mkdtemp()
# Select a pipeline name so that multiple runs of the same logical pipeline
# can be grouped.
PIPELINE_NAME = "function-based-pipeline"
# We use a ML Metadata configuration that uses a local SQLite database in
# the pipeline root directory. Other backends for ML Metadata are available
# for production usage.
METADATA_CONNECTION_CONFIG = tfx.orchestration.metadata.sqlite_metadata_connection_config(
    os.path.join(PIPELINE_ROOT, 'metadata.sqlite'))

def function_based_pipeline():
  # Here, we construct our generator and consumer components in the same way.
  generator = MyGenerator()
  consumer = MyConsumer(
      data=generator.outputs['data'],
      algorithm='md5')

  return tfx.dsl.Pipeline(
      pipeline_name=PIPELINE_NAME,
      pipeline_root=PIPELINE_ROOT,
      components=[generator, consumer],
      metadata_connection_config=METADATA_CONNECTION_CONFIG)

my_pipeline = function_based_pipeline()

であなたのパイプラインを実行しますLocalDagRunner

tfx.orchestration.LocalDagRunner().run(my_pipeline)
I0930 02:05:44.833285 16837 rdbms_metadata_access_object.cc:686] No property is defined for the Type
I0930 02:05:44.840090 16837 rdbms_metadata_access_object.cc:686] No property is defined for the Type
I0930 02:05:44.846822 16837 rdbms_metadata_access_object.cc:686] No property is defined for the Type
I0930 02:05:44.853597 16837 rdbms_metadata_access_object.cc:686] No property is defined for the Type
I0930 02:05:44.863385 16837 rdbms_metadata_access_object.cc:686] No property is defined for the Type
I0930 02:05:44.878916 16837 rdbms_metadata_access_object.cc:686] No property is defined for the Type
I0930 02:05:44.894802 16837 rdbms_metadata_access_object.cc:686] No property is defined for the Type
I0930 02:05:44.913914 16837 rdbms_metadata_access_object.cc:686] No property is defined for the Type

このパイプラインの実行によって生成された出力アーティファクトを検査できます。

find {PIPELINE_ROOT}
/tmp/tmpm4plox37
/tmp/tmpm4plox37/metadata.sqlite
/tmp/tmpm4plox37/MyConsumer
/tmp/tmpm4plox37/MyConsumer/.system
/tmp/tmpm4plox37/MyConsumer/.system/executor_execution
/tmp/tmpm4plox37/MyConsumer/.system/executor_execution/2
/tmp/tmpm4plox37/MyConsumer/hash
/tmp/tmpm4plox37/MyConsumer/hash/2
/tmp/tmpm4plox37/MyConsumer/hash/2/value
/tmp/tmpm4plox37/MyGenerator
/tmp/tmpm4plox37/MyGenerator/data
/tmp/tmpm4plox37/MyGenerator/data/1
/tmp/tmpm4plox37/MyGenerator/data/1/data_file.txt
/tmp/tmpm4plox37/MyGenerator/.system
/tmp/tmpm4plox37/MyGenerator/.system/executor_execution
/tmp/tmpm4plox37/MyGenerator/.system/executor_execution/1

これで、独自のカスタムコンポーネントを作成し、LocalDagRunnerでの実行を調整しました。次の手順については、上の追加のチュートリアルとガイドをチェックアウトTFXのウェブサイト