本頁面由 Cloud Translation API 翻譯而成。
Switch to English

TensorFlow轉換入門

本指南介紹了tf.Transform的基本概念以及如何使用它們。它會:

  • 定義一個預處理功能 ,即對管道的邏輯描述,該管道將原始數據轉換為用於訓練機器學習模型的數據。
  • 通過將預處理功能轉換為Beam管道,展示用於轉換數據的Apache Beam實現。
  • 顯示其他用法示例。

定義預處理功能

預處理功能tf.Transform最重要的概念。預處理功能是數據集轉換的邏輯描述。預處理函數接受並返回張量字典,其中張量表示Tensor或2D SparseTensor 。有兩種函數可用於定義預處理功能:

  1. 接受並返回張量的任何函數。這些將TensorFlow操作添加到圖形,從而將原始數據轉換為轉換後的數據。
  2. tf.Transform提供的任何分析器 。分析器還接受並返回張量,但與TensorFlow函數不同,它們向圖形添加操作。而是由分析器使tf.Transform在TensorFlow之外計算全通運算。他們使用整個數據集中的輸入張量值來生成恆定張量,並將其作為輸出返回。例如, tft.min計算數據集中的張量的最小值。 tf.Transform提供了一組固定的分析器,但是將在以後的版本中進行擴展。

預處理功能示例

通過將分析器和常規的TensorFlow功能結合起來,用戶可以創建靈活的管道來轉換數據。以下預處理功能以不同方式轉換了三個功能中的每一個,並結合了兩個功能:

import tensorflow as tf
import tensorflow_transform as tft
import tensorflow_transform.beam as tft_beam

def preprocessing_fn(inputs):
  x = inputs['x']
  y = inputs['y']
  s = inputs['s']
  x_centered = x - tft.mean(x)
  y_normalized = tft.scale_to_0_1(y)
  s_integerized = tft.compute_and_apply_vocabulary(s)
  x_centered_times_y_normalized = x_centered * y_normalized
  return {
      'x_centered': x_centered,
      'y_normalized': y_normalized,
      'x_centered_times_y_normalized': x_centered_times_y_normalized,
      's_integerized': s_integerized
  }

在此, xys是代表輸入特徵的Tensor 。通過將tft.mean應用於x並從x減去來創建第一個創建的新張量x_centeredtft.mean(x)返回表示張量x平均值的張量。 x_centered是張量x減去均值。

以相似的方式創建第二個新張量y_normalized ,但使用便利方法tft.scale_to_0_1 。此方法執行類似於計算x_centered ,即計算最大值和最小值,並使用它們來縮放y

張量s_integerized顯示了字符串操作的示例。在這種情況下,我們採用字符串並將其映射為整數。這使用了便利功能tft.compute_and_apply_vocabulary 。此函數使用分析器計算輸入字符串獲取的唯一值,然後使用TensorFlow操作將輸入字符串轉換為唯一值表中的索引。

最後一列顯示可以通過組合張量來使用TensorFlow操作來創建新特徵。

預處理功能定義了數據集上的一系列操作。為了應用管道,我們依賴於tf.Transform API的具體實現。 Apache Beam實現提供了PTransform ,該函數將用戶的預處理功能應用於數據。 tf.Transform用戶的典型工作流程將構造一個預處理功能,然後將其合併到更大的Beam管道中,以創建用於訓練的數據。

批處理

批處理是TensorFlow的重要組成部分。由於tf.Transform的目標之一是為預處理提供一個TensorFlow圖,可以將其合併到服務圖(以及可選的訓練圖)中,因此批處理也是tf.Transform的重要概念。

儘管在上面的例子中並不明顯,用戶定義的預處理函數傳遞代表批次而不是單個實例張量,訓練和與TensorFlow服務期間發生的情況。另一方面,分析器對整個數據集執行計算,該計算返回單個值而不是一批值。 x是具有(batch_size,)形狀的Tensor ,而tft.mean(x)是具有()形狀的Tensor 。減去x - tft.mean(x)廣播,其中從x表示的批次的每個元素中減去tft.mean(x)的值。

Apache Beam實施

預處理功能旨在作為對在多個數據處理框架上實現的預處理管道的邏輯描述,而tf.Transform提供了在Apache Beam上使用的規範實現。此實現演示了實現所需的功能。此功能沒有正式的API,因此每個實現都可以使用針對其特定數據處理框架的慣用API。

Apache Beam實現提供了兩個PTransform用於處理預處理功能的數據。下面顯示了複合PTransform AnalyzeAndTransformDataset的用法:

raw_data = [
    {'x': 1, 'y': 1, 's': 'hello'},
    {'x': 2, 'y': 2, 's': 'world'},
    {'x': 3, 'y': 3, 's': 'hello'}
]

raw_data_metadata = ...
transformed_dataset, transform_fn = (
    (raw_data, raw_data_metadata) | tft_beam.AnalyzeAndTransformDataset(
        preprocessing_fn))
transformed_data, transformed_metadata = transformed_dataset

下面顯示了transformed_data內容,其中包含與原始數據格式相同的轉換列。特別是, s_integerized的值為[ s_integerized [0, 1, 0] -這些值取決於確定性,如何將單詞helloworld映射為整數。對於x_centered列,我們減去了平均值,因此x的值[ x_centered [1.0, 2.0, 3.0]變為[-1.0, 0.0, 1.0] 。同樣,其餘的列與它們的期望值匹配。

[{u's_integerized': 0,
  u'x_centered': -1.0,
  u'x_centered_times_y_normalized': -0.0,
  u'y_normalized': 0.0},
 {u's_integerized': 1,
  u'x_centered': 0.0,
  u'x_centered_times_y_normalized': 0.0,
  u'y_normalized': 0.5},
 {u's_integerized': 0,
  u'x_centered': 1.0,
  u'x_centered_times_y_normalized': 1.0,
  u'y_normalized': 1.0}]

raw_datatransformed_data都是數據集。接下來的兩節顯示Beam實現如何表示數據集以及如何向磁盤讀取和寫入數據。另一個返回值transform_fn表示應用於數據的轉換,下面將詳細介紹。

AnalyzeAndTransformDataset是實現AnalyzeDatasetTransformDataset提供的兩個基本轉換的組成。因此,以下兩個代碼段是等效的:

transformed_data, transform_fn = (
    my_data | tft_beam.AnalyzeAndTransformDataset(preprocessing_fn))
transform_fn = my_data | tft_beam.AnalyzeDataset(preprocessing_fn)
transformed_data = (my_data, transform_fn) | tft_beam.TransformDataset()

transform_fn是一個純函數,表示應用於數據集每一行的操作。特別是,分析器值已經被計算並視為常數。在此示例中, transform_fn包含x列的平均值, y列的最小值和最大值以及用於將字符串映射為整數的詞彙表作為常量。

tf.Transform一個重要功能是transform_fn表示行上的映射-它是分別應用於每行的純函數。匯總行的所有計算都在AnalyzeDataset完成。此外, transform_fn被表示為TensorFlow Graph其可以被嵌入到所述服務曲線圖。

在這種特殊情況下,提供AnalyzeAndTransformDataset進行優化。這與scikit-learn中使用的模式相同,提供了fittransformfit_transform方法。

數據格式和架構

在前面的代碼示例中,省略了定義raw_data_metadata的代碼。元數據包含定義數據佈局的架構,因此可以從多種格式讀取和寫入數據。即使上一節中顯示的內存中格式也不是自描述的,而是需要使用架構才能將其解釋為張量。

這是示例數據的模式定義:

from tensorflow_transform.tf_metadata import dataset_metadata
from tensorflow_transform.tf_metadata import schema_utils

raw_data_metadata = dataset_metadata.DatasetMetadata(
      schema_utils.schema_from_feature_spec({
        's': tf.io.FixedLenFeature([], tf.string),
        'y': tf.io.FixedLenFeature([], tf.float32),
        'x': tf.io.FixedLenFeature([], tf.float32),
    }))

dataset_schema.Schema類包含將數據從其磁盤上或內存中的格式解析為張量所需的信息。通常通過調用帶有dict映射功能鍵的tf.io.FixedLenFeaturetf.io.VarLenFeaturetf.io.SparseFeature值的dict映射schema_utils.schema_from_feature_spec來構造它。有關更多詳細信息,請參見tf.parse_example的文檔。

在上面,我們使用tf.io.FixedLenFeature來指示每個功能都包含固定數量的值,在這種情況下為單個標量值。由於tf.Transform批處理實例,因此表示要素的實際Tensor將具有形狀(None,) ,其中未知尺寸為批處理尺寸。

用Apache Beam輸入和輸出

到目前為止,示例的數據格式已使用字典列表。這是一種簡化,它依賴於Apache Beam處理列表以及其主要數據表示形式PCollectionPCollection是形成Beam管道一部分的數據表示。通過應用各種PTransform (包括AnalyzeDatasetTransformDataset )並運行管道來形成Beam管道。 PCollection不在主二進製文件的內存中創建,而是分配在工作進程之間(儘管本節使用內存中執行模式)。

以下示例需要在磁盤上讀寫數據,並且需要將數據表示為PCollection (而不是列表),請參閱: census_example.py 。下面我們展示瞭如何下載數據並運行此示例。 “人口普查收入”數據集由UCI機器學習存儲庫提供 。該數據集包含分類和數字數據。

數據為CSV格式,這是前兩行:

39, State-gov, 77516, Bachelors, 13, Never-married, Adm-clerical, Not-in-family, White, Male, 2174, 0, 40, United-States, <=50K
50, Self-emp-not-inc, 83311, Bachelors, 13, Married-civ-spouse, Exec-managerial, Husband, White, Male, 0, 0, 13, United-States, <=50K

數據集的列是分類的或數字的。由於有許多列,因此通過遍歷每種類型的所有列來生成Schema (類似於上一個示例)。該數據集描述了一個分類問題:預測個人年收入高於或低於50K的最後一列。但是,從tf.Transform的角度來看,此標籤只是另一個分類列。

使用此架構從CSV文件讀取數據。 ordered_columns常數按在CSV文件中出現的順序包含所有列的列表,這是必需的,因為架構不包含此信息。由於從CSV文件讀取時已經完成了一些額外的Beam轉換,因此已將其刪除。每個CSV行都以內存格式轉換為實例。

在此示例中,我們允許缺少education-num功能。這意味著,它被表示為一個tf.io.VarLenFeature在feature_spec,並且作為tf.SparseTensor在preprocessing_fn。為了處理可能缺少的要素值,我們用默認值(在本例中為0)填充缺失的實例。

converter = tft.coders.CsvCoder(ordered_columns, raw_data_schema)

raw_data = (
    p
    | 'ReadTrainData' >> textio.ReadFromText(train_data_file)
    | ...
    | 'DecodeTrainData' >> beam.Map(converter.decode))

預處理與前面的示例相似,除了預處理功能是通過編程方式生成的,而不是手動指定每列。在下面的預處理功能中, NUMERICAL_COLUMNSCATEGORICAL_COLUMNS是包含數字和類別列名稱的列表:

def preprocessing_fn(inputs):
  """Preprocess input columns into transformed columns."""
  # Since we are modifying some features and leaving others unchanged, we
  # start by setting `outputs` to a copy of `inputs.
  outputs = inputs.copy()

  # Scale numeric columns to have range [0, 1].
  for key in NUMERIC_FEATURE_KEYS:
    outputs[key] = tft.scale_to_0_1(outputs[key])

  for key in OPTIONAL_NUMERIC_FEATURE_KEYS:
    # This is a SparseTensor because it is optional. Here we fill in a default
    # value when it is missing.
      sparse = tf.sparse.SparseTensor(outputs[key].indices, outputs[key].values,
                                      [outputs[key].dense_shape[0], 1])
      dense = tf.sparse.to_dense(sp_input=sparse, default_value=0.)
    # Reshaping from a batch of vectors of size 1 to a batch to scalars.
    dense = tf.squeeze(dense, axis=1)
    outputs[key] = tft.scale_to_0_1(dense)

  # For all categorical columns except the label column, we generate a
  # vocabulary but do not modify the feature.  This vocabulary is instead
  # used in the trainer, by means of a feature column, to convert the feature
  # from a string to an integer id.
  for key in CATEGORICAL_FEATURE_KEYS:
    tft.vocabulary(inputs[key], vocab_filename=key)

  # For the label column we provide the mapping from string to index.
  initializer = tf.lookup.KeyValueTensorInitializer(
      keys=['>50K', '<=50K'],
      values=tf.cast(tf.range(2), tf.int64),
      key_dtype=tf.string,
      value_dtype=tf.int64)
  table = tf.lookup.StaticHashTable(initializer, default_value=-1)

  outputs[LABEL_KEY] = table.lookup(outputs[LABEL_KEY])

  return outputs

與上一個示例的不同之處在於,label列手動指定了從字符串到索引的映射。因此,將'>50'映射為0 ,將'<=50K'映射為1因為知道訓練模型中的哪個索引對應於哪個標籤非常有用。

raw_data變量表示一個PCollection ,它使用相同的AnalyzeAndTransformDataset轉換,包含與列表raw_data相同格式的數據(來自上一個示例)。該模式在兩個地方使用:從CSV文件讀取數據,以及作為AnalyzeAndTransformDataset輸入。 CSV格式和內存格式都必須與模式配對,才能將它們解釋為張量。

最後階段是將轉換後的數據寫入磁盤,並具有與讀取原始數據類似的形式。用於執行此操作的模式是AnalyzeAndTransformDataset輸出的一部分,該推斷AnalyzeAndTransformDataset輸出數據的模式。寫入磁盤的代碼如下所示。模式是元數據的一部分,但是在tf.Transform API中可互換使用兩者(即,將元數據傳遞給ExampleProtoCoder )。請注意,這會寫入不同的格式。可以使用Beam對TFRecord格式的內置支持來替代textio.WriteToText ,並使用編碼器將數據編碼為Example protos。這是用於培訓的更好格式,如下一節所示。 transformed_eval_data_base提供了寫入的各個分片的基本文件名。

transformed_data | "WriteTrainData" >> tfrecordio.WriteToTFRecord(
    transformed_eval_data_base,
    coder=tft.coders.ExampleProtoCoder(transformed_metadata))

除訓練數據外, transform_fn還與元數據一起寫出:

641

使用p.run().wait_until_finish()運行整個Beam管道。到此為止,Beam管道代表了延遲的分佈式計算。它提供了有關將要執行的操作的指令,但是指令尚未執行。此最終調用執行指定的管道。

下載人口普查數據集

使用以下shell命令下載人口普查數據集:

  wget https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.data
  wget https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.test

運行census_example.py腳本時,將包含此數據的目錄作為第一個參數傳遞。該腳本創建一個臨時子目錄以添加預處理的數據。

與TensorFlow培訓集成

census_example.py的最後一部分顯示瞭如何使用預處理的數據來訓練模型。有關詳細信息,請參見估算器文檔 。第一步是構造一個需要對預處理列進行描述的Estimator 。每個數字列都被描述為real_valued_column ,它是具有固定大小(在此示例中為1 )的密集向量的包裝。每個分類列都從字符串映射到整數,然後傳遞到indicator_columntft.TFTransformOutput用於查找每個分類功能的詞彙文件路徑。

real_valued_columns = [feature_column.real_valued_column(key)
                       for key in NUMERIC_FEATURE_KEYS]

one_hot_columns = [
    tf.feature_column.indicator_column(
        tf.feature_column.categorical_column_with_vocabulary_file(
            key=key,
            vocabulary_file=tf_transform_output.vocabulary_file_by_name(
                vocab_filename=key)))
    for key in CATEGORICAL_FEATURE_KEYS]

estimator = tf.estimator.LinearClassifier(real_valued_columns + one_hot_columns)

下一步是創建一個生成器,以生成用於訓練和評估的輸入函數。 tf.Learn使用的訓練,因為不需要功能說明來解析轉換後的數據。而是,將元數據用於轉換後的數據以生成功能規範。

def _make_training_input_fn(tf_transform_output, transformed_examples,
                            batch_size):
  ...
  def input_fn():
    """Input function for training and eval."""
    dataset = tf.data.experimental.make_batched_features_dataset(
        ..., tf_transform_output.transformed_feature_spec(), ...)

    transformed_features = tf.compat.v1.data.make_one_shot_iterator(
        dataset).get_next()
    ...

  return input_fn

其餘代碼與使用Estimator類相同。該示例還包含以SavedModel格式導出模型的代碼。 Tensorflow ServingCloud ML Engine可以使用導出的模型。