SIG TFX-Addons 커뮤니티에 가입하고 TFX를 더욱 향상시키는 데 도움을 주세요! SIG TFX 애드온 가입

TensorFlow Transform 시작하기

이 가이드에서는 tf.Transform 의 기본 개념과 사용 방법을 소개합니다. 그것은 :

  • 원시 데이터를 머신 러닝 모델 학습에 사용되는 데이터로 변환하는 파이프 라인의 논리적 설명 인 전처리 함수를 정의합니다.
  • 사전 처리 기능Beam 파이프 라인 으로 변환하여 데이터를 변환하는 데 사용되는 Apache Beam 구현을 보여줍니다.
  • 추가 사용 예를 표시합니다.

전처리 기능 정의

전처리 함수tf.Transform 의 가장 중요한 개념입니다. 전처리 기능은 데이터 세트 변환에 대한 논리적 설명입니다. 전처리 함수는 텐서 사전을 받아 반환합니다. 여기서 텐서Tensor 또는 SparseTensor 의미합니다. 전처리 함수를 정의하는 데 사용되는 두 종류의 함수가 있습니다.

  1. 텐서를 받아들이고 반환하는 모든 함수. 이렇게하면 원시 데이터를 변환 된 데이터로 변환하는 TensorFlow 작업이 그래프에 추가됩니다.
  2. tf.Transform 제공하는 모든 분석기 . 분석기는 또한 텐서를 수락하고 반환하지만 TensorFlow 함수와 달리 그래프에 작업을 추가 하지 않습니다 . 대신 분석기는 tf.Transform 이 TensorFlow 외부에서 전체 통과 작업을 계산하도록합니다. 전체 데이터 세트에서 입력 텐서 값을 사용하여 출력으로 반환되는 상수 텐서를 생성합니다. 예를 들어 tft.min 은 데이터 세트에 대한 텐서의 최소값을 계산합니다. tf.Transform 은 고정 된 분석기 세트를 제공하지만 향후 버전에서 확장 될 예정입니다.

전처리 기능 예

분석기와 일반 TensorFlow 함수를 결합하여 사용자는 데이터 변환을위한 유연한 파이프 라인을 만들 수 있습니다. 다음 전처리 함수는 세 가지 기능을 서로 다른 방식으로 변환하고 두 기능을 결합합니다.

import tensorflow as tf
import tensorflow_transform as tft
import tensorflow_transform.beam as tft_beam

def preprocessing_fn(inputs):
  x = inputs['x']
  y = inputs['y']
  s = inputs['s']
  x_centered = x - tft.mean(x)
  y_normalized = tft.scale_to_0_1(y)
  s_integerized = tft.compute_and_apply_vocabulary(s)
  x_centered_times_y_normalized = x_centered * y_normalized
  return {
      'x_centered': x_centered,
      'y_normalized': y_normalized,
      'x_centered_times_y_normalized': x_centered_times_y_normalized,
      's_integerized': s_integerized
  }

여기서 x , ys 는 입력 특성을 나타내는 Tensor 입니다. 생성되는 새로운 제 텐서 x_centered 적용하여 구축 tft.meanx 로부터 이것을 감산 x . tft.mean(x) 는 텐서 x 의 평균을 나타내는 텐서를 반환합니다. x_centered 는 평균을 뺀 텐서 x 입니다.

두 번째 새 텐서 인 y_normalized 는 유사한 방식으로 생성되지만 편의 메서드 tft.scale_to_0_1 사용합니다. 이 방법은 x_centered 계산과 유사한 작업을 x_centered . 즉, 최대 값과 최소값을 계산하고이를 사용하여 y 를 확장합니다.

텐서 s_integerized 는 문자열 조작의 예를 보여줍니다. 이 경우 문자열을 정수로 매핑합니다. 이것은 편의 함수tft.compute_and_apply_vocabulary 사용합니다. 이 함수는 분석기를 사용하여 입력 문자열에서 가져온 고유 값을 계산 한 다음 TensorFlow 작업을 사용하여 입력 문자열을 고유 값 테이블의 인덱스로 변환합니다.

마지막 열은 TensorFlow 작업을 사용하여 텐서를 결합하여 새로운 기능을 생성 할 수 있음을 보여줍니다.

전처리 기능은 데이터 세트에 대한 작업 파이프 라인을 정의합니다. 파이프 라인을 적용하기 위해 우리는 tf.Transform API의 구체적인 구현에 의존합니다. Apache Beam 구현은 사용자의 사전 처리 기능을 데이터에 적용하는 PTransform 을 제공합니다. tf.Transform 사용자의 일반적인 워크 플로는 사전 처리 기능을 생성 한 다음이를 더 큰 Beam 파이프 라인에 통합하여 학습용 데이터를 생성합니다.

일괄 처리

일괄 처리는 TensorFlow의 중요한 부분입니다. tf.Transform 의 목표 중 하나는 제공 그래프 (및 선택적으로 학습 그래프)에 통합 할 수있는 전처리를위한 TensorFlow 그래프를 제공하는 tf.Transform 일괄 처리도 tf.Transform 에서 중요한 개념입니다.

위의 예에서는 명확하지 않지만 사용자 정의 전처리 함수는 TensorFlow를 사용하여 학습 및 제공하는 동안 발생하는 것처럼 개별 인스턴스가 아닌 배치를 나타내는 텐서를 전달합니다. 반면에 분석기는 값의 배치가 아닌 단일 값을 반환하는 전체 데이터 세트에 대해 계산을 수행합니다. x(batch_size,) 모양의 Tensor 이고, tft.mean(x)() 모양의 Tensor 입니다. 빼기 x - tft.mean(x) tft.mean(x)x 표현되는 배치의 모든 요소에서 tft.mean(x) 값을 tft.mean(x) 곳을 브로드 캐스트합니다.

Apache Beam 구현

사전 처리 기능 은 여러 데이터 처리 프레임 워크에 구현 된 사전 처리 파이프 라인에 대한 논리적 설명을 목적으로하지만 tf.Transform 은 Apache Beam에서 사용되는 표준 구현을 제공합니다. 이 구현은 구현에 필요한 기능을 보여줍니다. 이 기능에 대한 공식적인 API가 없으므로 각 구현은 특정 데이터 처리 프레임 워크에 관용적 인 API를 사용할 수 있습니다.

Apache Beam 구현은 사전 처리 기능을 위해 데이터를 처리하는 데 사용되는 두 개의 PTransform 제공합니다. 다음은 복합 PTransform AnalyzeAndTransformDataset 의 사용법을 보여줍니다.

raw_data = [
    {'x': 1, 'y': 1, 's': 'hello'},
    {'x': 2, 'y': 2, 's': 'world'},
    {'x': 3, 'y': 3, 's': 'hello'}
]

raw_data_metadata = ...
transformed_dataset, transform_fn = (
    (raw_data, raw_data_metadata) | tft_beam.AnalyzeAndTransformDataset(
        preprocessing_fn))
transformed_data, transformed_metadata = transformed_dataset

transformed_data 컨텐츠는 아래에 표시되며 원시 데이터와 동일한 형식의 변환 된 열을 포함합니다. 특히, s_integerized 의 값은 [0, 1, 0] 입니다.이 값은 helloworld 라는 단어가 결정적인 정수로 매핑 된 방법에 따라 다릅니다. x_centered 열에 대해 평균을 빼서 [1.0, 2.0, 3.0] 이었던 x 열의 값이 [-1.0, 0.0, 1.0] . 마찬가지로 나머지 열은 예상 값과 일치합니다.

[{u's_integerized': 0,
  u'x_centered': -1.0,
  u'x_centered_times_y_normalized': -0.0,
  u'y_normalized': 0.0},
 {u's_integerized': 1,
  u'x_centered': 0.0,
  u'x_centered_times_y_normalized': 0.0,
  u'y_normalized': 0.5},
 {u's_integerized': 0,
  u'x_centered': 1.0,
  u'x_centered_times_y_normalized': 1.0,
  u'y_normalized': 1.0}]

raw_datatransformed_data 는 모두 데이터 세트입니다. 다음 두 섹션에서는 Beam 구현이 데이터 세트를 나타내는 방법과 데이터를 읽고 디스크에 쓰는 방법을 보여줍니다. 다른 반환 값인 transform_fn 은 아래에서 자세히 설명하는 데이터에 적용된 transform_fn 을 나타냅니다.

AnalyzeAndTransformDatasetAnalyzeDatasetTransformDataset 구현에서 제공하는 두 가지 기본 변환의 구성입니다. 따라서 다음 두 코드 스 니펫은 동일합니다.

transformed_data, transform_fn = (
    my_data | tft_beam.AnalyzeAndTransformDataset(preprocessing_fn))
transform_fn = my_data | tft_beam.AnalyzeDataset(preprocessing_fn)
transformed_data = (my_data, transform_fn) | tft_beam.TransformDataset()

transform_fn 은 데이터 세트의 각 행에 적용되는 연산을 나타내는 순수 함수입니다. 특히 분석기 값은 이미 계산되어 상수로 처리됩니다. 이 예에서 transform_fn 에는 x 열의 평균, y 열의 최소 및 최대, 문자열을 정수로 매핑하는 데 사용되는 어휘가 상수 transform_fn 포함됩니다.

의 중요한 기능 tf.Transform 점이다 transform_fn 순수한 기능은 개별적으로 각 행에 적용입니다 - 그것은 행을 통해지도를 나타냅니다. 행 집계를위한 모든 계산은 AnalyzeDataset 에서 수행됩니다. 또한 transform_fn 은 서빙 그래프에 삽입 할 수있는 TensorFlow Graph 로 표현됩니다.

이 특수한 경우 최적화를 위해 AnalyzeAndTransformDataset 이 제공됩니다. 이것은 scikit-learn 에서 사용되는 것과 동일한 패턴으로 fit , transformfit_transform 메서드를 제공합니다.

데이터 형식 및 스키마

TFT Beam 구현은 두 가지 다른 입력 데이터 형식을 허용합니다. "instance dict"형식 (위의 예 및 simple_example.py 에서 볼 수 simple_example.py )은 직관적 인 형식이며 작은 데이터 세트에 적합하며 TFXIO ( Apache Arrow ) 형식은 향상된 성능을 제공하고 대규모 데이터 세트에 적합합니다.

Beam 구현은 PCollection과 함께 제공되는 "메타 데이터"에 의해 입력 PCollection이 어떤 형식에 있는지 알려줍니다.

(raw_data, raw_data_metadata) | tft.AnalyzeDataset(...)
  • raw_data_metadatadataset_metadata.DatasetMetadata (아래 " 'instance dict'형식"섹션 참조) 인 경우 raw_data 는 "instance dict"형식 일 것으로 예상됩니다.
  • raw_data_metadatatfxio.TensorAdapterConfig (아래 "TFXIO 형식"섹션 참조)이면 raw_data 는 TFXIO 형식 일 것으로 예상됩니다.

"인스턴스 사전"형식

이전 코드 예제에서 raw_data_metadata 정의하는 코드는 생략되었습니다. 메타 데이터에는 데이터의 레이아웃을 정의하는 스키마가 포함되어있어 다양한 형식에서 읽고 쓸 수 있습니다. 마지막 섹션에 표시된 메모리 내 형식조차 자체 설명이 아니며 텐서로 해석 되려면 스키마가 필요합니다.

예제 데이터에 대한 스키마 정의는 다음과 같습니다.

from tensorflow_transform.tf_metadata import dataset_metadata
from tensorflow_transform.tf_metadata import schema_utils

raw_data_metadata = dataset_metadata.DatasetMetadata(
      schema_utils.schema_from_feature_spec({
        's': tf.io.FixedLenFeature([], tf.string),
        'y': tf.io.FixedLenFeature([], tf.float32),
        'x': tf.io.FixedLenFeature([], tf.float32),
    }))

Schema proto에는 온 디스크 또는 인 메모리 형식의 데이터를 텐서로 구문 분석하는 데 필요한 정보가 포함되어 있습니다. 일반적으로 기능 키를 tf.io.FixedLenFeature , tf.io.VarLenFeaturetf.io.SparseFeature 값에 매핑하는 dict를 사용하여 schema_utils.schema_from_feature_spec 을 호출하여 구성됩니다. 자세한 내용은 tf.parse_example 문서를 참조하세요.

위에서 우리는 tf.io.FixedLenFeature 를 사용하여 각 기능에 고정 된 수의 값 (이 경우 단일 스칼라 값)이 포함되어 있음을 나타냅니다. tf.Transform 인스턴스를 일괄 처리하기 때문에 기능을 나타내는 실제 Tensor 는 알 수없는 차원이 일괄 차원 인 형태 (None,) 갖습니다.

TFXIO 형식

이 형식을 사용하면 데이터가 pyarrow.RecordBatch 포함될 것으로 예상됩니다. 테이블 형식 데이터의 경우 Apache Beam 구현은 다음 유형의 열로 구성된 Arrow RecordBatch 허용합니다.

  • pa.list_(<primitive>) , 여기서 <primitive>pa.int64() , pa.float32() pa.binary() 또는 pa.large_binary() 입니다.

  • pa.large_list(<primitive>)

위에서 사용한 장난감 입력 데이터 세트는 RecordBatch 로 표시 될 때 다음과 같습니다.

raw_data = [
    pa.record_batch([
        pa.array([[1], [2], [3]], pa.list_(pa.float32())),
        pa.array([[1], [2], [3]], pa.list_(pa.float32())),
        pa.array([['hello'], ['world'], ['hello']], pa.list_(pa.binary())),
    ], ['x', 'y', 's'])
]

DatasetMetadata 유사은 "인스턴스 DICT"형식을 동반하는 데 필요한되고하는 tfxio.TensorAdapterConfig 동반 할 필요가 RecordBatch 말이지. RecordBatch es의 Arrow 스키마와 TensorRepresentations 로 구성되어 RecordBatch es의 열이 TensorFlow Tensor (tf.Tensor, tf.SparseTensor를 포함하되 이에 국한되지 않음)로 해석되는 방법을 고유하게 결정합니다.

TensorRepresentationspreprocessing_fn 허용하는 Tensor와 RecordBatch es의 열 사이의 관계를 설정하는 Dict[Text, TensorRepresentation] 입니다. 예를 들면 :

tensor_representation = {
    'x': text_format.Parse(
        """dense_tensor { column_name: "col1" shape { dim { size: 2 } } }"""
        schema_pb2.TensorRepresentation())
}

이는 preprocessing_fn inputs['x'] 가 입력 RecordBatch es의 'col1' 이라는 이름의 열에서 값을 가져 오는 조밀 한 tf.Tensor 여야하며 (일괄 처리 된) 모양이 [batch_size, 2] 여야 함을 의미합니다.

TensorRepresentationTensorFlow Metadata에 정의 된 Protobuf입니다.

TensorFlow와의 호환성

tf.Transform 은 위의 transform_fn TF 1.x 또는 TF 2.x 저장된 모델로 내보내기위한 지원을 제공합니다. 0.30 릴리스 이전의 기본 동작은 TF 1.x 저장된 모델을 내보냈습니다. 0.30 릴리스부터 기본 동작은 TF 2.x 동작이 명시 적으로 비활성화되지 않는 한 TF 2.x tf.compat.v1.disable_v2_behavior() 예 : tf.compat.v1.disable_v2_behavior() 호출).

EstimatorsSessions 와 같은 TF 1.x 개념을 사용하는 경우 tf.Transform 을 독립 실행 형 라이브러리로 사용하는 경우 force_tf_compat_v1=Truetft_beam.Context 에 전달하거나 TFX의 Transform 구성 요소에 전달하여 이전 동작을 유지할 수 있습니다.

transform_fn 을 TF 2.x 저장된 모델로 내보낼 때 preprocessing_fntf.function 사용하여 추적 할 수 있습니다. 또한 파이프 라인을 원격으로 실행하는 경우 (예 : DataflowRunner ) preprocessing_fn 및 모든 종속성이 여기에 설명 된대로 올바르게 패키징되었는지 확인 합니다 .

tf.Transform 을 사용하여 TF 2.x 저장된 tf.Transform 을 내보낼 때 발생하는 알려진 문제는 여기 에 설명되어 있습니다 .

Apache Beam을 사용한 입력 및 출력

지금까지 ( RecordBatch 또는 인스턴스 사전의) Python 목록에서 입력 및 출력 데이터를 RecordBatch . 이는 Apache Beam의 목록 작업 기능과 데이터의 주요 표현 인 PCollection 의존하는 단순화입니다.

PCollection 은 Beam 파이프 라인의 일부를 형성하는 데이터 표현입니다. Beam 파이프 라인은 AnalyzeDatasetTransformDataset 포함한 다양한 PTransform 을 적용하고 파이프 라인을 실행하여 형성됩니다. PCollection 은 기본 바이너리의 메모리에 생성되지 않고 워커간에 분산됩니다 (이 섹션에서는 메모리 내 실행 모드를 사용함).

미리 준비된 PCollection 소스 ( TFXIO )

구현에서 허용하는 RecordBatch 형식은 다른 TFX 라이브러리에서 허용하는 일반적인 형식입니다. 따라서 TFX는 디스크에서 다양한 형식의 파일을 읽고 RecordBatch 생성하는 편리한 "소스"(일명 TFXIO )를 제공 TensorAdapterConfig 추론 된 TensorRepresentations 포함하여 TensorAdapterConfig 를 제공 할 수도 있습니다.

이러한 TFXIO 는 패키지 tfx_bsl ( tfx_bsl.public.tfxio )에서 찾을 수 있습니다.

예 : "Census Income"데이터 세트

다음 예제에서는 디스크에서 데이터를 읽고 쓰고 데이터를 PCollection (목록 아님)으로 표시해야합니다. census_example.py 참조하십시오. 아래에서는 데이터를 다운로드하고이 예제를 실행하는 방법을 보여줍니다. "Census Income"데이터 세트는 UCI Machine Learning Repository에서 제공 합니다. 이 데이터 세트에는 범주 형 데이터와 숫자 형 데이터가 모두 포함됩니다.

데이터는 CSV 형식이며 처음 두 줄은 다음과 같습니다.

39, State-gov, 77516, Bachelors, 13, Never-married, Adm-clerical, Not-in-family, White, Male, 2174, 0, 40, United-States, <=50K
50, Self-emp-not-inc, 83311, Bachelors, 13, Married-civ-spouse, Exec-managerial, Husband, White, Male, 0, 0, 13, United-States, <=50K

데이터 세트의 열은 범주 형 또는 숫자 형입니다. 이 데이터 세트는 분류 문제를 설명합니다. 개인이 연간 소득이 50K 이상 또는 미만인 마지막 열을 예측합니다. 그러나 tf.Transform 의 관점에서이 레이블은 또 다른 범주 형 열일뿐입니다.

미리 만들어진 TFXIO , BeamRecordCsvTFXIO 를 사용하여 CSV 라인을 RecordBatches 로 변환합니다. TFXIO 에는 두 가지 중요한 정보가 필요합니다.

  • 각 CSV 열에 대한 유형 및 모양 정보를 포함하는 TensorFlow 메타 데이터 스키마 TensorRepresentation 은 스키마의 선택적 부분입니다. 제공되지 않으면 (이 예의 경우) 유형 및 모양 정보에서 유추됩니다. TF 파싱 사양 (이 예에 표시됨)에서 변환하기 위해 제공하는 도우미 함수를 사용하거나 TensorFlow Data Validation 을 실행하여 스키마를 가져올 수 있습니다.

  • CSV 파일에 나타나는 순서대로 열 이름 목록입니다. 이러한 이름은 스키마의 기능 이름과 일치해야합니다.

이 예에서는 education-num 기능이 누락 될 수 있습니다. 그것이로서 표현되는 이러한 수단 tf.io.VarLenFeature feature_spec에서와 같은 tf.SparseTensorpreprocessing_fn . 다른 기능은 preprocessing_fn 에서 동일한 이름의 tf.Tensor 가됩니다.

csv_tfxio = tfxio.BeamRecordCsvTFXIO(
    physical_format='text', column_names=ordered_columns, schema=SCHEMA)

record_batches = (
    p
    | 'ReadTrainData' >> textio.ReadFromText(train_data_file)
    | ...  # fix up csv lines
    | 'ToRecordBatches' >> csv_tfxio.BeamSource())

tensor_adapter_config = csv_tfxio.TensorAdapterConfig()

CSV 행을 읽은 후 몇 가지 추가 수정 작업을 수행해야했습니다. 그렇지 않으면 CsvTFXIO 를 사용하여 파일 읽기와 RecordBatch es로 변환을 모두 처리 할 수 ​​있습니다.

csv_tfxio = tfxio.CsvTFXIO(train_data_file, column_name=ordered_columns,
                           schema=SCHEMA)
record_batches = p | 'TFXIORead' >> csv_tfxio.BeamSource()
tensor_adapter_config = csv_tfxio.TensorAdapterConfig()

전처리는 각 열을 수동으로 지정하는 대신 프로그래밍 방식으로 전처리 함수가 생성된다는 점을 제외하면 이전 예와 유사합니다. 아래의 전처리 함수에서 NUMERICAL_COLUMNSCATEGORICAL_COLUMNS 는 숫자 및 범주 열의 이름을 포함하는 목록입니다.

def preprocessing_fn(inputs):
  """Preprocess input columns into transformed columns."""
  # Since we are modifying some features and leaving others unchanged, we
  # start by setting `outputs` to a copy of `inputs.
  outputs = inputs.copy()

  # Scale numeric columns to have range [0, 1].
  for key in NUMERIC_FEATURE_KEYS:
    outputs[key] = tft.scale_to_0_1(outputs[key])

  for key in OPTIONAL_NUMERIC_FEATURE_KEYS:
    # This is a SparseTensor because it is optional. Here we fill in a default
    # value when it is missing.
      sparse = tf.sparse.SparseTensor(outputs[key].indices, outputs[key].values,
                                      [outputs[key].dense_shape[0], 1])
      dense = tf.sparse.to_dense(sp_input=sparse, default_value=0.)
    # Reshaping from a batch of vectors of size 1 to a batch to scalars.
    dense = tf.squeeze(dense, axis=1)
    outputs[key] = tft.scale_to_0_1(dense)

  # For all categorical columns except the label column, we generate a
  # vocabulary but do not modify the feature.  This vocabulary is instead
  # used in the trainer, by means of a feature column, to convert the feature
  # from a string to an integer id.
  for key in CATEGORICAL_FEATURE_KEYS:
    tft.vocabulary(inputs[key], vocab_filename=key)

  # For the label column we provide the mapping from string to index.
  initializer = tf.lookup.KeyValueTensorInitializer(
      keys=['>50K', '<=50K'],
      values=tf.cast(tf.range(2), tf.int64),
      key_dtype=tf.string,
      value_dtype=tf.int64)
  table = tf.lookup.StaticHashTable(initializer, default_value=-1)

  outputs[LABEL_KEY] = table.lookup(outputs[LABEL_KEY])

  return outputs

이전 예와의 한 가지 차이점은 레이블 열이 문자열에서 인덱스로의 매핑을 수동으로 지정한다는 것입니다. 따라서 '>50'0 매핑되고 '<=50K'1 매핑됩니다. '<=50K' 된 모델에서 어떤 인덱스가 어떤 레이블에 해당하는지 아는 것이 유용하기 때문입니다.

record_batches 변수가 나타내는 PCollectionpyarrow.RecordBatch ES를. tensor_adapter_config 주어진다 csv_tfxio 추론된다 SCHEMA 합니다 (TF 파싱 사양에서, 본 실시 예에서, 그리고 궁극적으로).

마지막 단계는 변환 된 데이터를 디스크에 쓰는 것이며 원시 데이터를 읽는 것과 유사한 형태를 갖습니다. 이를 수행하는 데 사용되는 스키마는 출력 데이터에 대한 스키마를 추론하는 AnalyzeAndTransformDataset 출력의 일부입니다. 디스크에 쓰는 코드는 다음과 같습니다. 스키마는 메타 데이터의 일부이지만 tf.Transform API에서 두 가지를 서로 바꿔서 사용합니다 (예 : 메타 데이터를 ExampleProtoCoder 전달). 이것은 다른 형식으로 씁니다. textio.WriteToText 대신 TFRecord 형식에 대한 Beam의 내장 지원을 사용하고 코더를 사용하여 데이터를 Example TFRecord 로 인코딩합니다. 다음 섹션에서 볼 수 있듯이 이것은 훈련에 사용하기 더 좋은 형식입니다. transformed_eval_data_base 는 작성된 개별 샤드에 대한 기본 파일 이름을 제공합니다.

transformed_data | "WriteTrainData" >> tfrecordio.WriteToTFRecord(
    transformed_eval_data_base,
    coder=tft.coders.ExampleProtoCoder(transformed_metadata))

학습 데이터 외에도 transform_fn 은 메타 데이터와 함께 작성됩니다.

_ = (
    transform_fn
    | 'WriteTransformFn' >> tft_beam.WriteTransformFn(working_dir))
transformed_metadata | 'WriteMetadata' >> tft_beam.WriteMetadata(
    transformed_metadata_file, pipeline=p)

p.run().wait_until_finish() 하여 전체 Beam 파이프 라인을 실행합니다. 이 시점까지 Beam 파이프 라인은 지연된 분산 계산을 나타냅니다. 수행 할 작업에 대한 지침을 제공하지만 명령이 실행되지 않았습니다. 이 마지막 호출은 지정된 파이프 라인을 실행합니다.

인구 조사 데이터 세트 다운로드

다음 셸 명령어를 사용하여 인구 조사 데이터 세트를 다운로드합니다.

  wget https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.data
  wget https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.test

census_example.py 스크립트를 실행할 때이 데이터가 포함 된 디렉토리를 첫 번째 인수로 전달하십시오. 스크립트는 전처리 된 데이터를 추가하기 위해 임시 하위 디렉토리를 만듭니다.

TensorFlow Training과 통합

census_example.py 의 마지막 섹션은 전처리 된 데이터를 사용하여 모델을 학습시키는 방법을 보여줍니다. 자세한 내용은 Estimators 문서 를 참조하세요. 첫 번째 단계는 전처리 된 열에 대한 설명이 필요한 Estimator 를 구성하는 것입니다. 각 숫자 열은 고정 된 크기 (이 예에서는 1 의 조밀 한 벡터를 둘러싼 래퍼 인 real_valued_column 으로 설명됩니다. 각 범주 형 열은 문자열에서 정수로 매핑 된 다음 indicator_column 전달됩니다. tft.TFTransformOutput 은 각 범주 기능에 대한 어휘 파일 경로를 찾는 데 사용됩니다.

real_valued_columns = [feature_column.real_valued_column(key)
                       for key in NUMERIC_FEATURE_KEYS]

one_hot_columns = [
    tf.feature_column.indicator_column(
        tf.feature_column.categorical_column_with_vocabulary_file(
            key=key,
            vocabulary_file=tf_transform_output.vocabulary_file_by_name(
                vocab_filename=key)))
    for key in CATEGORICAL_FEATURE_KEYS]

estimator = tf.estimator.LinearClassifier(real_valued_columns + one_hot_columns)

다음 단계는 훈련 및 평가를위한 입력 함수를 생성하는 빌더를 만드는 것입니다. 변환 된 데이터를 구문 분석하는 데 기능 사양이 필요하지 않기 때문에 tf.Learn 사용하는 훈련과 다릅니다. 대신 변환 된 데이터의 메타 데이터를 사용하여 기능 사양을 생성하세요.

def _make_training_input_fn(tf_transform_output, transformed_examples,
                            batch_size):
  ...
  def input_fn():
    """Input function for training and eval."""
    dataset = tf.data.experimental.make_batched_features_dataset(
        ..., tf_transform_output.transformed_feature_spec(), ...)

    transformed_features = tf.compat.v1.data.make_one_shot_iterator(
        dataset).get_next()
    ...

  return input_fn

나머지 코드는 Estimator 클래스를 사용하는 것과 동일합니다. 이 예제에는 SavedModel 모델 형식으로 모델을 내보내는 코드도 포함되어 있습니다. 내 보낸 모델은 Tensorflow Serving 또는 Cloud ML Engine 에서 사용할 수 있습니다.