ヘルプKaggleにTensorFlowグレートバリアリーフを保護チャレンジに参加

TensorFlowTransformの使用を開始する

このガイドでは、の基本的な概念紹介tf.Transformし、それらを使用する方法。そうなる:

  • データに生データを変換パイプラインの論理的な説明がモデルを学習機械を訓練するために使用され、前処理関数を定義します。
  • 表示Apacheのビームビームパイプラインに前処理機能を変換てデータを変換するために使用される実装を。
  • 追加の使用例を示します。

前処理関数を定義する

前処理機能は、最も重要な概念ですtf.Transform 。前処理関数は、データセットの変換の論理記述です。前処理機能を受け入れ、テンソル手段テンソルの辞書、戻りTensor又はSparseTensor 。前処理関数を定義するために使用される関数には、次の2種類があります。

  1. テンソルを受け入れて返す関数。これらは、生データを変換されたデータに変換するTensorFlow操作をグラフに追加します。
  2. が提供する分析器の任意のtf.Transform 。アナライザも受け入れ、テンソルを返しますが、TensorFlow機能とは異なり、彼らはグラフに操作を追加しないでください。代わりに、原因の分析装置tf.Transform TensorFlowの完全なパス操作の外を計算します。データセット全体の入力テンソル値を使用して、出力として返される定数テンソルを生成します。例えば、 tft.min 、データセット上テンソルの最小値を計算します。 tf.Transform分析装置の固定されたセットを提供し、これは将来のバージョンで拡張されます。

前処理機能の例

アナライザーと通常のTensorFlow関数を組み合わせることで、ユーザーはデータを変換するための柔軟なパイプラインを作成できます。次の前処理関数は、3つの機能のそれぞれをさまざまな方法で変換し、2つの機能を組み合わせます。

import tensorflow as tf
import tensorflow_transform as tft
import tensorflow_transform.beam as tft_beam

def preprocessing_fn(inputs):
  x = inputs['x']
  y = inputs['y']
  s = inputs['s']
  x_centered = x - tft.mean(x)
  y_normalized = tft.scale_to_0_1(y)
  s_integerized = tft.compute_and_apply_vocabulary(s)
  x_centered_times_y_normalized = x_centered * y_normalized
  return {
      'x_centered': x_centered,
      'y_normalized': y_normalized,
      'x_centered_times_y_normalized': x_centered_times_y_normalized,
      's_integerized': s_integerized
  }

ここで、 xy及びsれるTensorの入力機能を表すこと。 、作成された最初の新しいテンソルx_centered 、印加することによって構築されるtft.meanxとからこれを減算するxtft.mean(x)テンソルの平均を表すテンソル返すxx_centeredあるテンソルx平均値とが減算されます。

2番目の新しいテンソル、 y_normalized 、同様の方法で作成したが、便利な方法使用しているtft.scale_to_0_1 。この方法は、コンピューティングに似ていx_centered 、即ち最大値と最小値を計算し、スケーリングするためにこれらを使用してy

テンソルs_integerizedショー文字列操作の例。この場合、文字列を取得して整数にマップします。これは便利な機能の使用tft.compute_and_apply_vocabulary 。この関数は、アナライザーを使用して入力文字列が取得した一意の値を計算し、TensorFlow操作を使用して入力文字列を一意の値のテーブルのインデックスに変換します。

最後の列は、TensorFlow操作を使用して、テンソルを組み合わせることで新しい機能を作成できることを示しています。

前処理関数は、データセットに対する操作のパイプラインを定義します。パイプラインを適用するために、我々は具体的な実装に依存しているtf.Transform API。 Apacheのビームの実装は提供PTransformデータへのユーザーの前処理機能を適用します。典型的なワークフローtf.Transformユーザは、その後、訓練のためのデータを作成する、より大きなビームパイプラインにこれを組み込む、前処理関数を構築します。

バッチ処理

バッチ処理はTensorFlowの重要な部分です。目標の1つはtf.Transformサービンググラフ(及び、必要に応じて、トレーニンググラフ)に組み込むことができることを前処理するためのTensorFlowグラフを提供することで、バッチ処理はまた、重要な概念であるtf.Transform

上記の例では明らかではないが、ユーザは、トレーニングとTensorFlowとなる時に起こるように関数は、バッチではなく個々のインスタンスを表すテンソルを渡される前処理定義しました。一方、アナライザーは、データセット全体に対して計算を実行し、値のバッチではなく単一の値を返します。 xあるTensorの形状(batch_size,)しながら、 tft.mean(x)あるTensorの形状に()サブトラクションx - tft.mean(x)の値をブロードキャストtft.mean(x)バッチで表さのすべての要素から減算されx

ApacheBeamの実装

前処理機能は、複数のデータ処理フレームワーク上に実装前処理パイプラインの論理的説明として意図されている間、 tf.Transform Apacheのビーム上で使用される正規の実装を提供します。この実装は、実装に必要な機能を示しています。この機能のための正式なAPIはないため、各実装では、特定のデータ処理フレームワークに固有のAPIを使用できます。

Apacheのビームの実装は、二つの提供PTransform前処理機能のためのプロセスに使用されるデータを。以下に示した複合の利用PTransform AnalyzeAndTransformDataset

raw_data = [
    {'x': 1, 'y': 1, 's': 'hello'},
    {'x': 2, 'y': 2, 's': 'world'},
    {'x': 3, 'y': 3, 's': 'hello'}
]

raw_data_metadata = ...
transformed_dataset, transform_fn = (
    (raw_data, raw_data_metadata) | tft_beam.AnalyzeAndTransformDataset(
        preprocessing_fn))
transformed_data, transformed_metadata = transformed_dataset

transformed_data内容は以下のとおりと生データと同じ形式で変換列が含まれています。具体的には、の値s_integerizedれる[0, 1, 0] -これらの値は、単語の方法によって異なりhelloworld決定論的であり、整数にマッピングしました。カラムがためx_centered 、我々は、列の値に平均値を減算xた、 [1.0, 2.0, 3.0]となった[-1.0, 0.0, 1.0]同様に、残りの列は期待値と一致します。

[{u's_integerized': 0,
  u'x_centered': -1.0,
  u'x_centered_times_y_normalized': -0.0,
  u'y_normalized': 0.0},
 {u's_integerized': 1,
  u'x_centered': 0.0,
  u'x_centered_times_y_normalized': 0.0,
  u'y_normalized': 0.5},
 {u's_integerized': 0,
  u'x_centered': 1.0,
  u'x_centered_times_y_normalized': 1.0,
  u'y_normalized': 1.0}]

どちらもraw_datatransformed_dataデータセットです。次の2つのセクションでは、Beamの実装がデータセットを表す方法と、データをディスクに読み書きする方法を示します。他の戻り値は、 transform_fn 、以下に詳細に覆われたデータに適用される変換を表します。

AnalyzeAndTransformDataset実装によって提供される二つの基本的変換の組成物でAnalyzeDatasetTransformDataset 。したがって、次の2つのコードスニペットは同等です。

transformed_data, transform_fn = (
    my_data | tft_beam.AnalyzeAndTransformDataset(preprocessing_fn))
transform_fn = my_data | tft_beam.AnalyzeDataset(preprocessing_fn)
transformed_data = (my_data, transform_fn) | tft_beam.TransformDataset()

transform_fn 、データセットの各列に適用される操作を表す純粋な関数です。特に、アナライザーの値はすでに計算され、定数として扱われます。一例では、 transform_fn定数として含まれている列の平均値x 、分およびカラムの最大値y 、および整数への文字列をマップするために使用される語彙。

重要な特徴tf.Transformつまりtransform_fn個別の行に適用される純関数である-it行上マップを表します。集計行の計算のすべてがで行われAnalyzeDataset 。さらに、 transform_fn TensorFlowとして表されるGraphサービンググラフに埋め込むことができます。

AnalyzeAndTransformDatasetこの特殊なケースでの最適化のために提供されます。これはで使用したのと同じパターンであるscikit-学ぶ提供し、 fittransform 、およびfit_transform方法。

データ形式とスキーマ

TFTビームの実装は、2つの異なる入力データ形式を受け入れます。 「インスタンス辞書」形式(上記及び例に見られるようにsimple_example.py )直感的な形式で、TFXIO(ながら小さなデータセットに適しているApacheの矢印)フォーマットが改善された性能を提供し、大規模なデータセットのsuitbleあります。

Beamの実装は、PCollectionに付随する「メタデータ」によって入力PCollectionがどの形式になるかを示します。

(raw_data, raw_data_metadata) | tft.AnalyzeDataset(...)
  • 場合raw_data_metadataあるdataset_metadata.DatasetMetadata (「『インスタンス辞書』フォーマット」セクション、以下を参照のこと)、次いでraw_data 「インスタンス辞書」の形式であることが期待されます。
  • 場合raw_data_metadataあるtfxio.TensorAdapterConfig (以下「TFXIOフォーマット」の項を参照)、次いでraw_data TFXIO形式であることが期待されます。

「インスタンスディクト」形式

前のコード例では、コード定義raw_data_metadata省略されています。メタデータには、データのレイアウトを定義するスキーマが含まれているため、さまざまな形式でデータの読み取りと書き込みが行われます。前のセクションで示したメモリ内フォーマットでさえ、自己記述的ではなく、テンソルとして解釈されるためにスキーマを必要とします。

サンプルデータのスキーマの定義は次のとおりです。

from tensorflow_transform.tf_metadata import dataset_metadata
from tensorflow_transform.tf_metadata import schema_utils

raw_data_metadata = dataset_metadata.DatasetMetadata(
      schema_utils.schema_from_feature_spec({
        's': tf.io.FixedLenFeature([], tf.string),
        'y': tf.io.FixedLenFeature([], tf.float32),
        'x': tf.io.FixedLenFeature([], tf.float32),
    }))

Schemaプロトはテンソルに、そのディスク上またはメモリ内のフォーマットからのデータを解析するために必要な情報が含まれています。これは、一般的に呼び出すことにより構成されschema_utils.schema_from_feature_specする機能キーをマッピングする辞書でtf.io.FixedLenFeaturetf.io.VarLenFeature 、およびtf.io.SparseFeature値。ドキュメントを参照してくださいtf.parse_example詳細については。

我々が使用する上記tf.io.FixedLenFeatureこの場合には、各機能が値の固定数が含まれていることを示すために、単一のスカラー値を。のでtf.Transformバッチインスタンス、実際のTensor特徴を表す形状になります(None,) 、未知の寸法は、バッチの寸法です。

TFXIOフォーマット

このフォーマットでは、データが中に含まれると予想されるpyarrow.RecordBatch 。表形式のデータについては、当社のApacheビームの実装では、矢印受け入れRecordBatch次のタイプの列で構成ESを:

  • pa.list_(<primitive>) <primitive>であるpa.int64() pa.float32() pa.binary()またはpa.large_binary()

  • pa.large_list(<primitive>)

以下のように表さ我々は上記の使用おもちゃの入力データセット、 RecordBatch 、次のようになります。

raw_data = [
    pa.record_batch([
        pa.array([[1], [2], [3]], pa.list_(pa.float32())),
        pa.array([[1], [2], [3]], pa.list_(pa.float32())),
        pa.array([['hello'], ['world'], ['hello']], pa.list_(pa.binary())),
    ], ['x', 'y', 's'])
]

DatasetMetadataが「インスタンス辞書」形式に同行するために必要なさと同様に、 tfxio.TensorAdapterConfig同行するために必要とされるRecordBatch ESを。これは矢印スキーマから成るRecordBatch ES、およびTensorRepresentations一意の列方法を決定するためにRecordBatch ESは(含むなくtf.Tensor、tf.SparseTensorに限ら)TensorFlowテンソルとして解釈することができます。

TensorRepresentationsあるDict[Text, TensorRepresentation]ことテンソルの関係を確立preprocessing_fnで受け取り、列RecordBatch ES。例えば:

tensor_representation = {
    'x': text_format.Parse(
        """dense_tensor { column_name: "col1" shape { dim { size: 2 } } }"""
        schema_pb2.TensorRepresentation())
}

手段そのinputs['x']preprocessing_fn緻密であるべきで、その値名前の列から来るtf.Tensor、 'col1'の入力におけるRecordBatch ES、およびその(バッチ)形状であるべきである[batch_size, 2]

TensorRepresentationで定義されているProtobufあるTensorFlowメタデータ

TensorFlowとの互換性

tf.Transformエクスポートするためのサポートを提供transform_fn TF 1.xまたはTF 2.xのSavedModelのいずれかと上記。前にデフォルトの動作0.30リリースには、TF 1.xのSavedModelを輸出しました。以降では0.30のリリース、デフォルトの動作では、2.xの行動は(呼び出すことで、明示的に無効にされTFない限り、TF 2.xのSavedModelをエクスポートすることですtf.compat.v1.disable_v2_behavior()など)。

以下のようなTF 1.xの概念使用している場合はEstimatorsSessions 、あなたが渡すことで、以前の動作を保持することができますforce_tf_compat_v1=Trueするtft_beam.Context使用している場合tf.Transformスタンドアロンのライブラリとしてまたはに変換TFXのコンポーネント。

エクスポート時transform_fn TF 2.xのSavedModelとして、 preprocessing_fn用いて追跡可能であると予想されるtf.function 。また、(と例えばリモートであなたのパイプラインを実行している場合DataflowRunner )、ことを確認してくださいpreprocessing_fn説明するようにとの依存関係が適切にパッケージされているここに

使用に関する既知の問題tf.Transform TF 2.xのSavedModelをエクスポートするためには、文書化されてここに

ApacheBeamによる入力と出力

これまでのところ、我々は(のpythonのリストの入力データと出力データを見てきましたRecordBatch ESまたはインスタンスの辞書)。これはリストだけでなく、データのその主要な表現、で動作するようにApacheのビームの能力に依存して単純化であるPCollection

PCollectionビームパイプラインの一部を形成するデータ表現です。ビームパイプラインは、様々な適用することによって形成されるPTransform S、を含むAnalyzeDatasetTransformDataset 、パイプラインを実行しています。 A PCollectionメインバイナリのメモリ内に作成されず、その代わりに(このセクションはメモリ内の実行モードを使用しているが)労働者の間で分配されます。

プレ缶詰PCollectionソース( TFXIO

RecordBatch私たちの実装が受け入れるフォーマットは、他のTFXライブラリが受け入れることを共通のフォーマットです。したがって、TFX申し出便利(別名「ソース」 TFXIOディスクや農産物に様々な形式のファイルを読み込む) RecordBatch ESとも与えることができますTensorAdapterConfig推論含め、 TensorRepresentations

これらTFXIO Sは、パッケージに記載されていますtfx_bsltfx_bsl.public.tfxio )。

例:「国勢調査収入」データセット

次の例では、読み取りおよびディスクにデータを書き込むの両方を必要としてデータを表すPCollection (ないリスト)、以下を参照してください。 census_example.py 。以下に、データをダウンロードしてこの例を実行する方法を示します。提供されたデータセット「国勢調査所得」 UCI機械学習リポジトリ。このデータセットには、カテゴリデータと数値データの両方が含まれています。

データはCSV形式です。最初の2行は次のとおりです。

39, State-gov, 77516, Bachelors, 13, Never-married, Adm-clerical, Not-in-family, White, Male, 2174, 0, 40, United-States, <=50K
50, Self-emp-not-inc, 83311, Bachelors, 13, Married-civ-spouse, Exec-managerial, Husband, White, Male, 0, 0, 13, United-States, <=50K

データセットの列は、カテゴリまたは数値のいずれかです。このデータセットは、分類の問題について説明しています。つまり、個人が年間5万以上または5万未満の収入を得る最後の列を予測します。ただし、の観点からtf.Transform 、このラベルはちょうど別のカテゴリの列です。

私たちは、事前に缶詰の使用TFXIOBeamRecordCsvTFXIOへのCSVライン翻訳しRecordBatchesTFXIO情報の二つの重要な部分を必要とします。

  • TensorFlowメタデータスキーマ各CSVカラム約種類や形状情報を含んでいます。 TensorRepresentation Sはスキーマの任意の部分です。提供されていない場合(この例の場合)、タイプと形状の情報から推測されます。一つは、我々は(この例で示す)TFパーススペックから変換するために提供ヘルパー関数を使用することにより、または実行することにより、いずれかのスキーマを取得することができますTensorFlowデータの検証を

  • CSVファイルに表示される順序での列名のリスト。これらの名前は、スキーマの機能名と一致する必要があることに注意してください。

この例では、我々はできるようにeducation-num機能が欠落していることを。それは次のように表現されていることを、この手段tf.io.VarLenFeature feature_specで、およびAS tf.SparseTensorpreprocessing_fn 。その他の機能はなりますtf.Tensorの同じ名前の中preprocessing_fn

csv_tfxio = tfxio.BeamRecordCsvTFXIO(
    physical_format='text', column_names=ordered_columns, schema=SCHEMA)

record_batches = (
    p
    | 'ReadTrainData' >> textio.ReadFromText(train_data_file)
    | ...  # fix up csv lines
    | 'ToRecordBatches' >> csv_tfxio.BeamSource())

tensor_adapter_config = csv_tfxio.TensorAdapterConfig()

。CSVの行が読み込まれた後、我々はいくつかの追加修正アップをしなければならなかったことをそれ以外の場合、我々はに依存している可能性があり注意CsvTFXIO両方のファイルを読み込むとに変換する処理するためにRecordBatch ES:

csv_tfxio = tfxio.CsvTFXIO(train_data_file, column_name=ordered_columns,
                           schema=SCHEMA)
record_batches = p | 'TFXIORead' >> csv_tfxio.BeamSource()
tensor_adapter_config = csv_tfxio.TensorAdapterConfig()

前処理は前の例と似ていますが、前処理関数が各列を手動で指定する代わりにプログラムで生成される点が異なります。以下の前処理機能では、 NUMERICAL_COLUMNSCATEGORICAL_COLUMNS 、数値とカテゴリ列の名前を含むリストは以下のとおりです。

def preprocessing_fn(inputs):
  """Preprocess input columns into transformed columns."""
  # Since we are modifying some features and leaving others unchanged, we
  # start by setting `outputs` to a copy of `inputs.
  outputs = inputs.copy()

  # Scale numeric columns to have range [0, 1].
  for key in NUMERIC_FEATURE_KEYS:
    outputs[key] = tft.scale_to_0_1(outputs[key])

  for key in OPTIONAL_NUMERIC_FEATURE_KEYS:
    # This is a SparseTensor because it is optional. Here we fill in a default
    # value when it is missing.
      sparse = tf.sparse.SparseTensor(outputs[key].indices, outputs[key].values,
                                      [outputs[key].dense_shape[0], 1])
      dense = tf.sparse.to_dense(sp_input=sparse, default_value=0.)
    # Reshaping from a batch of vectors of size 1 to a batch to scalars.
    dense = tf.squeeze(dense, axis=1)
    outputs[key] = tft.scale_to_0_1(dense)

  # For all categorical columns except the label column, we generate a
  # vocabulary but do not modify the feature.  This vocabulary is instead
  # used in the trainer, by means of a feature column, to convert the feature
  # from a string to an integer id.
  for key in CATEGORICAL_FEATURE_KEYS:
    tft.vocabulary(inputs[key], vocab_filename=key)

  # For the label column we provide the mapping from string to index.
  initializer = tf.lookup.KeyValueTensorInitializer(
      keys=['>50K', '<=50K'],
      values=tf.cast(tf.range(2), tf.int64),
      key_dtype=tf.string,
      value_dtype=tf.int64)
  table = tf.lookup.StaticHashTable(initializer, default_value=-1)

  outputs[LABEL_KEY] = table.lookup(outputs[LABEL_KEY])

  return outputs

前の例との違いの1つは、ラベル列が文字列からインデックスへのマッピングを手動で指定することです。だから'>50'にマッピングされて0'<=50K'にマッピングされている1 、それはラベルに訓練されたモデルの対応でどのインデックスを知っておくと便利だからです。

record_batches変数は表しPCollectionpyarrow.RecordBatch ESを。 tensor_adapter_configによって与えられるcsv_tfxioから推測され、 SCHEMA (そして最終的に、この例では、仕様を解析TFから)。

最終段階は、変換されたデータをディスクに書き込むことであり、生データの読み取りと同様の形式になります。これを行うために使用されるスキーマは、出力の一部であるAnalyzeAndTransformDataset出力データのスキーマを推論します。ディスクに書き込むコードを以下に示します。スキーマは、メタデータの一部ですが、中には交換可能に2を使用していますtf.Transform API(すなわちにメタデータを渡すExampleProtoCoder )。これは別の形式で書き込むことに注意してください。代わりにtextio.WriteToText 、ビームの組み込みのサポート使用TFRecordフォーマットとしてデータを符号化する符号化器を使用してExample PROTOS。次のセクションに示すように、これはトレーニングに使用するのに適した形式です。 transformed_eval_data_base書かれている個々の破片のための基本ファイル名を提供します。

transformed_data | "WriteTrainData" >> tfrecordio.WriteToTFRecord(
    transformed_eval_data_base,
    coder=tft.coders.ExampleProtoCoder(transformed_metadata))

トレーニングデータに加えて、 transform_fn 、メタデータで書き出されます。

_ = (
    transform_fn
    | 'WriteTransformFn' >> tft_beam.WriteTransformFn(working_dir))
transformed_metadata | 'WriteMetadata' >> tft_beam.WriteMetadata(
    transformed_metadata_file, pipeline=p)

全体のビームパイプライン実行p.run().wait_until_finish()この時点まで、Beamパイプラインは遅延分散計算を表しています。何が行われるかについての指示を提供しますが、指示は実行されていません。この最後の呼び出しは、指定されたパイプラインを実行します。

国勢調査データセットをダウンロードする

次のシェルコマンドを使用して国勢調査データセットをダウンロードします。

  wget https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.data
  wget https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.test

実行している場合census_example.pyスクリプトを、最初の引数として、このデータを含むディレクトリを渡します。スクリプトは、前処理されたデータを追加するための一時的なサブディレクトリを作成します。

TensorFlowトレーニングと統合する

最後のセクションcensus_example.py前処理データがモデルを訓練するために使用される方法を示しています。参照推定量のドキュメント詳細については、を。最初のステップは、構築することであるEstimator前処理カラムの説明を必要とします。各数値列は、以下のように記載されてreal_valued_column固定サイズ(と密ベクトルのラッパーである1この例では)。各カテゴリの列は、文字列から整数にマッピングされ、その後に渡さindicator_columntft.TFTransformOutput各カテゴリの機能のための語彙ファイルパスを見つけるために使用されます。

real_valued_columns = [feature_column.real_valued_column(key)
                       for key in NUMERIC_FEATURE_KEYS]

one_hot_columns = [
    tf.feature_column.indicator_column(
        tf.feature_column.categorical_column_with_vocabulary_file(
            key=key,
            vocabulary_file=tf_transform_output.vocabulary_file_by_name(
                vocab_filename=key)))
    for key in CATEGORICAL_FEATURE_KEYS]

estimator = tf.estimator.LinearClassifier(real_valued_columns + one_hot_columns)

次のステップは、トレーニングと評価のための入力関数を生成するビルダーを作成することです。で使用されるトレーニングと異なっtf.Learn機能仕様以降は、変換されたデータを解析するために必要とされていません。代わりに、変換されたデータのメタデータを使用して機能仕様を生成します。

def _make_training_input_fn(tf_transform_output, transformed_examples,
                            batch_size):
  ...
  def input_fn():
    """Input function for training and eval."""
    dataset = tf.data.experimental.make_batched_features_dataset(
        ..., tf_transform_output.transformed_feature_spec(), ...)

    transformed_features = tf.compat.v1.data.make_one_shot_iterator(
        dataset).get_next()
    ...

  return input_fn

残りのコードは、使用するのと同じであるEstimatorクラス。例では、モデルをエクスポートするコードが含まれているSavedModelフォーマット。エクスポートされたモデルを使用することができサービングTensorflowまたはクラウドMLエンジン