SIG TFX-Addonsコミュニティに参加して、TFXをさらに改善するのを手伝ってください! SIGTFXに参加-アドオン

TensorFlowTransformの使用を開始する

このガイドでは、 tf.Transformの基本的な概念とその使用方法を紹介します。そうなる:

  • 前処理関数を定義します。これは、生データを機械学習モデルのトレーニングに使用されるデータに変換するパイプラインの論理記述です。
  • 前処理関数Beamパイプラインに変換してデータを変換するために使用されるApacheBeamの実装を示します。
  • 追加の使用例を示します。

前処理関数を定義する

前処理関数は、 tf.Transform最も重要な概念です。前処理関数は、データセットの変換の論理記述です。前処理関数は、テンソルの辞書を受け入れて返します。ここで、テンソルTensorまたはSparseTensor意味します。前処理関数を定義するために使用される関数には、次の2種類があります。

  1. テンソルを受け入れて返す関数。これらは、生データを変換されたデータに変換するTensorFlow操作をグラフに追加します。
  2. tf.Transformによって提供されるアナライザーのいずれか。アナライザーもテンソルを受け入れて返しますが、TensorFlow関数とは異なり、グラフに操作を追加しません。代わりに、原因の分析装置tf.Transform TensorFlowの完全なパス操作の外を計算します。データセット全体の入力テンソル値を使用して、出力として返される定数テンソルを生成します。たとえば、 tft.minは、データセットに対するテンソルの最小値を計算します。 tf.Transformは固定されたアナライザーのセットを提供しますが、これは将来のバージョンで拡張される予定です。

前処理機能の例

アナライザーと通常のTensorFlow関数を組み合わせることで、ユーザーはデータを変換するための柔軟なパイプラインを作成できます。次の前処理関数は、3つの機能のそれぞれを異なる方法で変換し、2つの機能を組み合わせます。

import tensorflow as tf
import tensorflow_transform as tft
import tensorflow_transform.beam as tft_beam

def preprocessing_fn(inputs):
  x = inputs['x']
  y = inputs['y']
  s = inputs['s']
  x_centered = x - tft.mean(x)
  y_normalized = tft.scale_to_0_1(y)
  s_integerized = tft.compute_and_apply_vocabulary(s)
  x_centered_times_y_normalized = x_centered * y_normalized
  return {
      'x_centered': x_centered,
      'y_normalized': y_normalized,
      'x_centered_times_y_normalized': x_centered_times_y_normalized,
      's_integerized': s_integerized
  }

ここで、 xy 、およびsは、入力特徴を表すTensorです。 、作成された最初の新しいテンソルx_centered 、印加することによって構築されるtft.meanxとからこれを減算するxtft.mean(x)テンソルの平均を表すテンソル返すxx_centeredは、平均を差し引いたテンソルxです。

2番目の新しいテンソルy_normalizedも同様の方法で作成されますが、便利なメソッドtft.scale_to_0_1を使用します。この方法は、 x_centered計算と同様のことをx_centered 。つまり、最大値と最小値を計算し、これらを使用してyをスケーリングします。

テンソルs_integerizedは、文字列操作の例を示しています。この場合、文字列を取得して整数にマップします。これは、便利な関数tft.compute_and_apply_vocabulary使用します。この関数は、アナライザーを使用して入力文字列が取得した一意の値を計算し、TensorFlow操作を使用して入力文字列を一意の値のテーブルのインデックスに変換します。

最後の列は、TensorFlow操作を使用して、テンソルを組み合わせることで新しい機能を作成できることを示しています。

前処理関数は、データセットに対する操作のパイプラインを定義します。パイプラインを適用するために、 tf.Transform具体的な実装に依存しています。 Apache Beamの実装は、ユーザーの前処理機能をデータに適用するPTransformを提供します。 tf.Transformユーザーの一般的なワークフローでは、前処理関数を作成し、これをより大きなBeamパイプラインに組み込んで、トレーニング用のデータを作成します。

バッチ処理

バッチ処理はTensorFlowの重要な部分です。 tf.Transformの目標の1つは、サービンググラフ(およびオプションでトレーニンググラフ)に組み込むことができる前処理用のTensorFlowグラフを提供することであるため、バッチ処理もtf.Transform重要な概念tf.Transform

上記の例では明らかではありませんが、ユーザー定義の前処理関数には、トレーニング中やTensorFlowでの提供中に発生するように、個々のインスタンスではなくバッチを表すテンソルが渡されます。一方、アナライザーは、データセット全体に対して計算を実行し、値のバッチではなく単一の値を返します。 x(batch_size,)形状のTensorであり、 tft.mean(x)()形状のTensorです。サブトラクションx - tft.mean(x)の値をブロードキャストtft.mean(x)バッチで表さのすべての要素から減算されx

ApacheBeamの実装

前処理関数は、複数のデータ処理フレームワークに実装された前処理パイプラインの論理記述として意図されていますが、 tf.Transformは、 tf.Transform使用される標準的な実装を提供します。この実装は、実装に必要な機能を示しています。この機能のための正式なAPIはないため、各実装では、特定のデータ処理フレームワークに固有のAPIを使用できます。

Apache Beamの実装は、前処理機能のデータを処理するために使用される2つのPTransform提供します。以下に、複合PTransform AnalyzeAndTransformDataset使用法を示しPTransform AnalyzeAndTransformDataset

raw_data = [
    {'x': 1, 'y': 1, 's': 'hello'},
    {'x': 2, 'y': 2, 's': 'world'},
    {'x': 3, 'y': 3, 's': 'hello'}
]

raw_data_metadata = ...
transformed_dataset, transform_fn = (
    (raw_data, raw_data_metadata) | tft_beam.AnalyzeAndTransformDataset(
        preprocessing_fn))
transformed_data, transformed_metadata = transformed_dataset

transformed_data内容を以下に示します。これには、生データと同じ形式の変換された列が含まれています。特に、 s_integerizedの値は[0, 1, 0] s_integerized [0, 1, 0]です。これらの値は、 helloおよびworldという単語が整数にどのようにマップされたかによって異なります。これは決定論的です。列x_centeredについては、平均を減算したため、列x値( [1.0, 2.0, 3.0] x_centered [1.0, 2.0, 3.0] )は[-1.0, 0.0, 1.0] [1.0, 2.0, 3.0]になりました。同様に、残りの列は期待値と一致します。

[{u's_integerized': 0,
  u'x_centered': -1.0,
  u'x_centered_times_y_normalized': -0.0,
  u'y_normalized': 0.0},
 {u's_integerized': 1,
  u'x_centered': 0.0,
  u'x_centered_times_y_normalized': 0.0,
  u'y_normalized': 0.5},
 {u's_integerized': 0,
  u'x_centered': 1.0,
  u'x_centered_times_y_normalized': 1.0,
  u'y_normalized': 1.0}]

raw_datatransformed_dataはどちらもデータセットです。次の2つのセクションでは、Beamの実装がデータセットを表す方法と、データをディスクに読み書きする方法を示します。もう1つの戻り値transform_fnは、データに適用されるtransform_fnを表します。これについては、以下で詳しく説明します。

AnalyzeAndTransformDatasetは、実装AnalyzeDatasetTransformDatasetによって提供される2つの基本的な変換の構成です。したがって、次の2つのコードスニペットは同等です。

transformed_data, transform_fn = (
    my_data | tft_beam.AnalyzeAndTransformDataset(preprocessing_fn))
transform_fn = my_data | tft_beam.AnalyzeDataset(preprocessing_fn)
transformed_data = (my_data, transform_fn) | tft_beam.TransformDataset()

transform_fnは、データセットの各行に適用される操作を表す純粋関数です。特に、アナライザーの値はすでに計算され、定数として扱われます。この例では、 transform_fnは、列xの平均、列y最小値と最大値、および文字列を整数にマップするために使用される語彙が定数として含まれています。

重要な特徴tf.Transformつまりtransform_fn個別の行に適用される純関数である-it行上マップを表します。行を集約するためのすべての計算は、 AnalyzeDatasetで行われAnalyzeDataset 。さらに、 transform_fnは、サービンググラフに埋め込むことができるTensorFlow Graphとして表されます。

AnalyzeAndTransformDatasetは、この特別な場合の最適化のために提供されています。これはscikit-learnで使用されるのと同じパターンであり、 fittransform 、およびfit_transformメソッドをfit_transformます。

データ形式とスキーマ

TFTビームの実装は、2つの異なる入力データ形式を受け入れます。 「instancedict」形式(上記の例とsimple_example.py見られる)は直感的な形式であり、小さなデータセットに適していますが、TFXIO( Apache Arrow )形式はパフォーマンスが向上し、大きなデータセットに適しています。

Beamの実装は、PCollectionに付随する「メタデータ」によって入力PCollectionがどの形式になるかを示します。

(raw_data, raw_data_metadata) | tft.AnalyzeDataset(...)
  • raw_data_metadatadataset_metadata.DatasetMetadata場合(以下の「 'instance dict'形式」セクションを参照)、 raw_dataは「instancedict」形式であることが期待されます。
  • raw_data_metadatatfxio.TensorAdapterConfig (以下の「TFXIO形式」のセクションを参照)の場合、 raw_dataはTFXIO形式であることが期待されます。

「インスタンスディクト」形式

前のコード例では、 raw_data_metadata定義するコードは省略されています。メタデータには、データのレイアウトを定義するスキーマが含まれているため、さまざまな形式でデータの読み取りと書き込みが行われます。前のセクションで示したメモリ内フォーマットでさえ、自己記述的ではなく、テンソルとして解釈されるためにスキーマを必要とします。

サンプルデータのスキーマの定義は次のとおりです。

from tensorflow_transform.tf_metadata import dataset_metadata
from tensorflow_transform.tf_metadata import schema_utils

raw_data_metadata = dataset_metadata.DatasetMetadata(
      schema_utils.schema_from_feature_spec({
        's': tf.io.FixedLenFeature([], tf.string),
        'y': tf.io.FixedLenFeature([], tf.float32),
        'x': tf.io.FixedLenFeature([], tf.float32),
    }))

Schemaプロトには、データをディスク上またはメモリ内の形式からテンソルに解析するために必要な情報が含まれています。これは通常、機能キーをtf.io.FixedLenFeaturetf.io.VarLenFeature 、およびtf.io.SparseFeature値にマッピングするdictをschema_utils.schema_from_feature_specしてschema_utils.schema_from_feature_specを呼び出すことによって構築されます。詳細については、 tf.parse_exampleのドキュメントを参照してください。

上記では、 tf.io.FixedLenFeatureを使用して、各機能に固定数の値(この場合は単一のスカラー値)が含まれていることを示しています。 tf.Transformインスタンスをバッチ処理するため、フィーチャを表す実際のTensorは形状(None,)になります。ここで、未知の次元はバッチ次元です。

TFXIOフォーマット

この形式では、データはpyarrow.RecordBatch含まれていることが期待されます。表形式のデータの場合、Apache Beamの実装は、次のタイプの列で構成されるRecordBatch受け入れます。

  • pa.list_(<primitive>) 、ここで<primitive>pa.int64()pa.float32() pa.binary()またはpa.large_binary()です。

  • pa.large_list(<primitive>)

上記で使用したおもちゃの入力データセットは、 RecordBatchとして表される場合、次のようになります。

raw_data = [
    pa.record_batch([
        pa.array([[1], [2], [3]], pa.list_(pa.float32())),
        pa.array([[1], [2], [3]], pa.list_(pa.float32())),
        pa.array([['hello'], ['world'], ['hello']], pa.list_(pa.binary())),
    ], ['x', 'y', 's'])
]

DatasetMetadataが「インスタンス辞書」形式に同行するために必要なさと同様に、 tfxio.TensorAdapterConfig同行するために必要とされるRecordBatch ESを。これは矢印スキーマから成るRecordBatch ES、およびTensorRepresentations一意の列方法を決定するためにRecordBatch ESは(含むなくtf.Tensor、tf.SparseTensorに限ら)TensorFlowテンソルとして解釈することができます。

TensorRepresentationsDict[Text, TensorRepresentation]あり、 preprocessing_fn受け入れるTensorとRecordBatch列との関係を確立します。例えば:

tensor_representation = {
    'x': text_format.Parse(
        """dense_tensor { column_name: "col1" shape { dim { size: 2 } } }"""
        schema_pb2.TensorRepresentation())
}

preprocessing_fn inputs['x']は密なtf.Tensorである必要があり、その値は入力RecordBatchの名前'col1'列からRecordBatchれ、その(バッチ処理された)形状は[batch_size, 2]必要があります。

TensorRepresentationは、TensorFlow メタデータで定義されているProtobufです。

TensorFlowとの互換性

tf.Transformは、上記のtransform_fntf.Transformとしてエクスポートするためのサポートを提供します。 0.30リリース以前のデフォルトの動作では、TF 1.xSavedModelがエクスポートされました。 0.30リリース以降、デフォルトの動作では、TF 2.xの動作が明示的に無効にされていない限りtf.compat.v1.disable_v2_behavior()たとえば、 tf.compat.v1.disable_v2_behavior()を呼び出してtf.compat.v1.disable_v2_behavior() 、TF 2.xSavedModelをエクスポートします。

EstimatorsSessionsなどのtft_beam.Contextを使用している場合、 tf.Transformをスタンドアロンライブラリとして使用している場合はforce_tf_compat_v1=Truetft_beam.Context 、またはTFXのTransformコンポーネントに渡すことで、以前の動作を維持できます。

transform_fnをTF2.x SavedModelとしてエクスポートする場合、 preprocessing_fntf.functionを使用して追跡可能であることが期待されます。さらに、パイプラインをリモートで実行している場合(たとえば、 DataflowRunner )、 preprocessing_fnと依存関係がここで説明されているように適切にパッケージ化されていることを確認します

tf.Transformを使用してtf.Transformをエクスポートするtf.Transform既知の問題は、ここに記載されています

ApacheBeamによる入力と出力

これまで、( RecordBatchまたはインスタンス辞書の)Pythonリストで入力データと出力データを見てきました。これは、リストを操作するApache Beamの機能と、データの主要な表現であるPCollection依存する単純化です。

PCollectionは、Beamパイプラインの一部を形成するデータ表現です。ビームパイプラインは、 AnalyzeDatasetTransformDatasetなどのさまざまなPTransform適用し、パイプラインを実行することによって形成されます。 PCollectionはメインバイナリのメモリに作成されませんが、代わりにワーカー間で分散されます(ただし、このセクションではメモリ内実行モードを使用します)。

事前にPCollectionソース( TFXIO

私たちの実装が受け入れるRecordBatch形式は、他のTFXライブラリが受け入れる一般的な形式です。したがって、TFXは、ディスク上のさまざまな形式のファイルを読み取り、 RecordBatchを生成する便利な「ソース」(別名TFXIO )をTensorAdapterConfig 、推測されたTensorRepresentationsを含むTensorRepresentationsを提供することもできます。

これらのTFXIOは、パッケージtfx_bsltfx_bsl.public.tfxio )にあります。

例:「国勢調査収入」データセット

次の例では、読み取りおよびディスクにデータを書き込むの両方を必要としてデータを表すPCollection (ないリスト)、以下を参照してください。 census_example.py 。以下に、データをダウンロードしてこの例を実行する方法を示します。 「国勢調査収入」データセットは、 UCI機械学習リポジトリによって提供されます。このデータセットには、カテゴリデータと数値データの両方が含まれています。

データはCSV形式です。最初の2行は次のとおりです。

39, State-gov, 77516, Bachelors, 13, Never-married, Adm-clerical, Not-in-family, White, Male, 2174, 0, 40, United-States, <=50K
50, Self-emp-not-inc, 83311, Bachelors, 13, Married-civ-spouse, Exec-managerial, Husband, White, Male, 0, 0, 13, United-States, <=50K

データセットの列は、カテゴリまたは数値のいずれかです。このデータセットは、分類の問題について説明しています。つまり、個人が年間5万ドル以上または5万ドル未満を稼ぐ最後の列を予測します。ただし、 tf.Transformの観点からは、このラベルは単なる別のカテゴリ列です。

事前にBeamRecordCsvTFXIOれたTFXIOBeamRecordCsvTFXIOを使用して、CSV行をRecordBatchesます。 TFXIOは、次の2つの重要な情報が必要です。

  • 各CSV列に関するタイプと形状の情報を含むTensorFlowメタデータスキーマTensorRepresentationは、スキーマのオプション部分です。提供されていない場合(この例の場合)、タイプと形状の情報から推測されます。スキーマを取得するには、TF解析仕様(この例に示されている)から変換するために提供されているヘルパー関数を使用するか、 TensorFlowデータ検証を実行します。

  • CSVファイルに表示される順序での列名のリスト。これらの名前は、スキーマの機能名と一致する必要があることに注意してください。

この例では、 education-num機能が欠落していることを許可します。それは次のように表現されていることを、この手段tf.io.VarLenFeature feature_specで、およびAS tf.SparseTensorpreprocessing_fn 。他の機能は、 preprocessing_fn同じ名前のtf.Tensorになります。

csv_tfxio = tfxio.BeamRecordCsvTFXIO(
    physical_format='text', column_names=ordered_columns, schema=SCHEMA)

record_batches = (
    p
    | 'ReadTrainData' >> textio.ReadFromText(train_data_file)
    | ...  # fix up csv lines
    | 'ToRecordBatches' >> csv_tfxio.BeamSource())

tensor_adapter_config = csv_tfxio.TensorAdapterConfig()

。CSVの行が読み込まれた後、我々はいくつかの追加修正アップをしなければならなかったことをそれ以外の場合、我々はに依存している可能性があり注意CsvTFXIO両方のファイルを読み込むとに変換する処理するためにRecordBatch ES:

csv_tfxio = tfxio.CsvTFXIO(train_data_file, column_name=ordered_columns,
                           schema=SCHEMA)
record_batches = p | 'TFXIORead' >> csv_tfxio.BeamSource()
tensor_adapter_config = csv_tfxio.TensorAdapterConfig()

前処理は前の例と似ていますが、前処理関数が各列を手動で指定する代わりにプログラムで生成される点が異なります。以下の前処理関数では、 NUMERICAL_COLUMNSおよびCATEGORICAL_COLUMNSは、数値列およびカテゴリー列の名前を含むリストです。

def preprocessing_fn(inputs):
  """Preprocess input columns into transformed columns."""
  # Since we are modifying some features and leaving others unchanged, we
  # start by setting `outputs` to a copy of `inputs.
  outputs = inputs.copy()

  # Scale numeric columns to have range [0, 1].
  for key in NUMERIC_FEATURE_KEYS:
    outputs[key] = tft.scale_to_0_1(outputs[key])

  for key in OPTIONAL_NUMERIC_FEATURE_KEYS:
    # This is a SparseTensor because it is optional. Here we fill in a default
    # value when it is missing.
      sparse = tf.sparse.SparseTensor(outputs[key].indices, outputs[key].values,
                                      [outputs[key].dense_shape[0], 1])
      dense = tf.sparse.to_dense(sp_input=sparse, default_value=0.)
    # Reshaping from a batch of vectors of size 1 to a batch to scalars.
    dense = tf.squeeze(dense, axis=1)
    outputs[key] = tft.scale_to_0_1(dense)

  # For all categorical columns except the label column, we generate a
  # vocabulary but do not modify the feature.  This vocabulary is instead
  # used in the trainer, by means of a feature column, to convert the feature
  # from a string to an integer id.
  for key in CATEGORICAL_FEATURE_KEYS:
    tft.vocabulary(inputs[key], vocab_filename=key)

  # For the label column we provide the mapping from string to index.
  initializer = tf.lookup.KeyValueTensorInitializer(
      keys=['>50K', '<=50K'],
      values=tf.cast(tf.range(2), tf.int64),
      key_dtype=tf.string,
      value_dtype=tf.int64)
  table = tf.lookup.StaticHashTable(initializer, default_value=-1)

  outputs[LABEL_KEY] = table.lookup(outputs[LABEL_KEY])

  return outputs

前の例との違いの1つは、ラベル列が文字列からインデックスへのマッピングを手動で指定することです。したがって、 '>50'0マップされ、 '<=50K'1マップされます。これは、トレーニングされたモデルのどのインデックスがどのラベルに対応するかを知るのに役立つためです。

record_batches変数は、 PCollectionpyarrow.RecordBatch表します。 tensor_adapter_configtensor_adapter_configによって指定されcsv_tfxio 。これは、 SCHEMAから(そして最終的には、この例ではTF解析仕様から)推測されます。

最終段階は、変換されたデータをディスクに書き込むことであり、生データの読み取りと同様の形式になります。これを行うために使用されるスキーマは、出力データのスキーマを推測するAnalyzeAndTransformDatasetの出力の一部です。ディスクに書き込むコードを以下に示します。スキーマは、メタデータの一部ですが、中には交換可能に2を使用していますtf.Transform API(すなわちにメタデータを渡すExampleProtoCoder )。これは別の形式で書き込むことに注意してください。 textio.WriteToText代わりに、 TFRecord形式に対するBeam​​の組み込みサポートを使用し、コーダーを使用してデータをExample TFRecordとしてエンコードします。これは、次のセクションに示すように、トレーニングに使用するのに適した形式です。 transformed_eval_data_baseは、書き込まれる個々のシャードのベースファイル名を提供します。

transformed_data | "WriteTrainData" >> tfrecordio.WriteToTFRecord(
    transformed_eval_data_base,
    coder=tft.coders.ExampleProtoCoder(transformed_metadata))

トレーニングデータに加えて、 transform_fnもメタデータとともに書き出されます。

_ = (
    transform_fn
    | 'WriteTransformFn' >> tft_beam.WriteTransformFn(working_dir))
transformed_metadata | 'WriteMetadata' >> tft_beam.WriteMetadata(
    transformed_metadata_file, pipeline=p)

p.run().wait_until_finish()p.run().wait_until_finish() Beamパイプライン全体を実行します。この時点まで、Beamパイプラインは遅延分散計算を表しています。何が行われるかについての指示を提供しますが、指示は実行されていません。この最後の呼び出しは、指定されたパイプラインを実行します。

国勢調査データセットをダウンロードする

次のシェルコマンドを使用して、国勢調査データセットをダウンロードします。

  wget https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.data
  wget https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.test

census_example.pyスクリプトを実行するときは、このデータを含むディレクトリを最初の引数として渡します。スクリプトは、前処理されたデータを追加するための一時的なサブディレクトリを作成します。

TensorFlowトレーニングと統合する

census_example.pyの最後のセクションは、前処理されたデータを使用してモデルをトレーニングする方法を示しています。詳細については、 Estimatorsのドキュメントを参照してください。最初のステップは、前処理された列の記述を必要とするEstimatorを構築することです。各数値列は、固定サイズ(この例では1 )の密なベクトルのラッパーであるreal_valued_columnとして記述されます。各カテゴリ列は文字列から整数にマップされてから、 indicator_columnに渡されindicator_columntft.TFTransformOutputは、各カテゴリ機能の語彙ファイルパスを見つけるために使用されます。

real_valued_columns = [feature_column.real_valued_column(key)
                       for key in NUMERIC_FEATURE_KEYS]

one_hot_columns = [
    tf.feature_column.indicator_column(
        tf.feature_column.categorical_column_with_vocabulary_file(
            key=key,
            vocabulary_file=tf_transform_output.vocabulary_file_by_name(
                vocab_filename=key)))
    for key in CATEGORICAL_FEATURE_KEYS]

estimator = tf.estimator.LinearClassifier(real_valued_columns + one_hot_columns)

次のステップは、トレーニングと評価のための入力関数を生成するビルダーを作成することです。変換されたデータを解析するために機能仕様が必要ないため、 tf.Learn使用されるトレーニングとは異なります。代わりに、変換されたデータのメタデータを使用して機能仕様を生成します。

def _make_training_input_fn(tf_transform_output, transformed_examples,
                            batch_size):
  ...
  def input_fn():
    """Input function for training and eval."""
    dataset = tf.data.experimental.make_batched_features_dataset(
        ..., tf_transform_output.transformed_feature_spec(), ...)

    transformed_features = tf.compat.v1.data.make_one_shot_iterator(
        dataset).get_next()
    ...

  return input_fn

残りのコードは、 Estimatorクラスを使用する場合と同じです。この例には、モデルをSavedModel形式でエクスポートするためのコードも含まれてSavedModelます。エクスポートされたモデルを使用することができサービングTensorflowまたはクラウドMLエンジン