หน้านี้ได้รับการแปลโดย Cloud Translation API
Switch to English

เริ่มต้นกับ TensorFlow Transform

คู่มือนี้แนะนำแนวคิดพื้นฐานของ tf.Transform และวิธีการใช้งาน มันจะ:

  • กำหนด ฟังก์ชันก่อนการประมวลผล คำอธิบายเชิงตรรกะของไปป์ไลน์ที่แปลงข้อมูลดิบเป็นข้อมูลที่ใช้ในการฝึกโมเดลแมชชีนเลิร์นนิง
  • แสดงการใช้งาน Apache Beam ที่ ใช้ในการแปลงข้อมูลโดยการแปลง ฟังก์ชันก่อน การ ประมวลผล เป็น Beam pipeline
  • แสดงตัวอย่างการใช้งานเพิ่มเติม

กำหนดฟังก์ชันก่อนการประมวลผล

ฟังก์ชันก่อน การ ประมวลผล เป็นแนวคิดที่สำคัญที่สุดของ tf.Transform ฟังก์ชันก่อนการประมวลผลเป็นคำอธิบายเชิงตรรกะของการเปลี่ยนแปลงของชุดข้อมูล ฟังก์ชั่น preprocessing ยอมรับและผลตอบแทนในพจนานุกรมของเทนเซอร์ที่เมตริกซ์หมายถึง Tensor หรือ SparseTensor มีฟังก์ชันสองประเภทที่ใช้ในการกำหนดฟังก์ชันก่อนการประมวลผล:

  1. ฟังก์ชันใด ๆ ที่ยอมรับและส่งคืนค่าเทนเซอร์ สิ่งเหล่านี้เพิ่มการดำเนินการ TensorFlow ให้กับกราฟที่แปลงข้อมูลดิบเป็นข้อมูลที่แปลงแล้ว
  2. เครื่องวิเคราะห์ ใด ๆ ที่จัดทำโดย tf.Transform เครื่องวิเคราะห์ยังยอมรับและส่งคืนค่าเทนเซอร์ แต่ต่างจากฟังก์ชัน TensorFlow คือ ไม่ เพิ่มการดำเนินการลงในกราฟ ตัววิเคราะห์ทำให้เกิด tf.Transform แทนแทนเพื่อคำนวณการดำเนินการแบบเต็มผ่านภายนอก TensorFlow พวกเขาใช้ค่าเทนเซอร์อินพุตเหนือชุดข้อมูลทั้งหมดเพื่อสร้างค่าเทนเซอร์คงที่ที่ส่งคืนเป็นเอาต์พุต ตัวอย่างเช่น tft.min คำนวณค่าต่ำสุดของเทนเซอร์บนชุดข้อมูล tf.Transform มีชุดเครื่องมือวิเคราะห์ที่ตายตัว แต่จะขยายไปในเวอร์ชันต่อ ๆ ไป

ตัวอย่างฟังก์ชันก่อนการประมวลผล

ด้วยการรวมตัววิเคราะห์และฟังก์ชัน TensorFlow ปกติผู้ใช้สามารถสร้างไปป์ไลน์ที่ยืดหยุ่นสำหรับการแปลงข้อมูล ฟังก์ชันก่อนการประมวลผลต่อไปนี้จะแปลงคุณสมบัติทั้งสามอย่างแตกต่างกันและรวมคุณสมบัติสองอย่างเข้าด้วยกัน:

import tensorflow as tf
import tensorflow_transform as tft
import tensorflow_transform.beam as tft_beam

def preprocessing_fn(inputs):
  x = inputs['x']
  y = inputs['y']
  s = inputs['s']
  x_centered = x - tft.mean(x)
  y_normalized = tft.scale_to_0_1(y)
  s_integerized = tft.compute_and_apply_vocabulary(s)
  x_centered_times_y_normalized = x_centered * y_normalized
  return {
      'x_centered': x_centered,
      'y_normalized': y_normalized,
      'x_centered_times_y_normalized': x_centered_times_y_normalized,
      's_integerized': s_integerized
  }

ในที่นี้ x , y และ s คือ Tensor s ที่แสดงถึงคุณสมบัติการป้อนข้อมูล เทนเซอร์ใหม่ตัวแรกที่สร้างขึ้น x_centered สร้างขึ้นโดยใช้ tft.mean กับ x และลบค่านี้ออกจาก x tft.mean(x) ส่งกลับค่าเทนเซอร์ที่แทนค่าเฉลี่ยของเทนเซอร์ x x_centered คือเทนเซอร์ x มีค่าเฉลี่ยลบ

เทนเซอร์ใหม่ตัวที่สอง y_normalized ถูกสร้างขึ้นในลักษณะที่คล้ายกัน แต่ใช้วิธีอำนวยความสะดวก tft.scale_to_0_1 วิธีนี้จะคล้ายกับการคำนวณ x_centered คือการคำนวณค่าสูงสุดและต่ำสุดและใช้สิ่งเหล่านี้เพื่อปรับขนาด y

เทนเซอร์ s_integerized แสดงตัวอย่างของการจัดการสตริง ในกรณีนี้เราใช้สตริงและแมปกับจำนวนเต็ม ซึ่งใช้ฟังก์ชันอำนวยความสะดวกtft.compute_and_apply_vocabulary ฟังก์ชันนี้ใช้ตัววิเคราะห์เพื่อคำนวณค่าเฉพาะที่นำมาจากสตริงอินพุตจากนั้นใช้การดำเนินการ TensorFlow เพื่อแปลงสตริงอินพุตเป็นดัชนีในตารางของค่าที่ไม่ซ้ำกัน

คอลัมน์สุดท้ายแสดงให้เห็นว่าสามารถใช้การดำเนินการ TensorFlow เพื่อสร้างคุณสมบัติใหม่โดยการรวมเทนเซอร์

ฟังก์ชันก่อนการประมวลผลกำหนดไปป์ไลน์ของการดำเนินการบนชุดข้อมูล ในการใช้ไปป์ไลน์เราอาศัยการใช้ tf.Transform API อย่างเป็นรูปธรรม การใช้ Apache Beam ให้ PTransform ซึ่งใช้ฟังก์ชันการประมวลผลล่วงหน้าของผู้ใช้กับข้อมูล เวิร์กโฟลว์ทั่วไปของผู้ใช้ tf.Transform จะสร้างฟังก์ชันก่อนการประมวลผลจากนั้นรวมสิ่งนี้เข้ากับไปป์ไลน์ Beam ที่ใหญ่ขึ้นสร้างข้อมูลสำหรับการฝึกอบรม

การผสม

Batching เป็นส่วนสำคัญของ TensorFlow เนื่องจากเป้าหมายอย่างหนึ่งของ tf.Transform คือการจัดเตรียมกราฟ TensorFlow สำหรับการประมวลผลล่วงหน้าที่สามารถรวมเข้ากับกราฟการแสดงผล (และเป็นทางเลือกของกราฟการฝึกอบรม) การทำแบตช์ก็เป็นแนวคิดที่สำคัญเช่นกันใน tf.Transform

แม้ว่าจะไม่ชัดเจนในตัวอย่างข้างต้น แต่ฟังก์ชันการประมวลผลล่วงหน้าที่ผู้ใช้กำหนดจะถูกส่งผ่านเทนเซอร์ที่เป็นตัวแทนของ แบทช์ ไม่ใช่แต่ละอินสแตนซ์ดังเช่นที่เกิดขึ้นระหว่างการฝึกอบรมและการให้บริการกับ TensorFlow ในทางกลับกันนักวิเคราะห์จะทำการคำนวณในชุดข้อมูลทั้งหมดที่ส่งคืนค่าเดียวไม่ใช่ชุดของค่า x คือ Tensor มีรูปร่างเป็น (batch_size,) ในขณะที่ tft.mean(x) คือ Tensor มีรูปร่างเป็น () การลบ x - tft.mean(x) ออกอากาศโดยที่ค่าของ tft.mean(x) ถูกลบออกจากทุกองค์ประกอบของแบทช์ที่แสดงด้วย x

การติดตั้ง Apache Beam

แม้ว่า ฟังก์ชันก่อน การ ประมวลผล มีจุดมุ่งหมายเพื่อเป็นคำอธิบายเชิงตรรกะของ ไปป์ไลน์การประมวลผลล่วงหน้าที่ ใช้กับเฟรมเวิร์กการประมวลผลข้อมูลหลาย ๆ เฟรม tf.Transform จัดเตรียมการใช้งานแบบบัญญัติที่ใช้กับ Apache Beam การใช้งานนี้แสดงให้เห็นถึงฟังก์ชันที่จำเป็นจากการใช้งาน ไม่มี API ที่เป็นทางการสำหรับฟังก์ชันนี้ดังนั้นการใช้งานแต่ละครั้งสามารถใช้ API ที่เป็นสำนวนสำหรับกรอบการประมวลผลข้อมูลโดยเฉพาะ

การใช้งาน Apache Beam มี PTransform สอง PTransform ที่ใช้ในการประมวลผลข้อมูลสำหรับฟังก์ชันก่อนการประมวลผล ต่อไปนี้แสดงการใช้งานสำหรับ PTransform AnalyzeAndTransformDataset :

raw_data = [
    {'x': 1, 'y': 1, 's': 'hello'},
    {'x': 2, 'y': 2, 's': 'world'},
    {'x': 3, 'y': 3, 's': 'hello'}
]

raw_data_metadata = ...
transformed_dataset, transform_fn = (
    (raw_data, raw_data_metadata) | tft_beam.AnalyzeAndTransformDataset(
        preprocessing_fn))
transformed_data, transformed_metadata = transformed_dataset

เนื้อหา transformed_data แสดงอยู่ด้านล่างและมีคอลัมน์ที่ถูกแปลงในรูปแบบเดียวกับข้อมูลดิบ โดยเฉพาะอย่างยิ่งค่าของ s_integerized คือ [0, 1, 0] - ค่าเหล่านี้ขึ้นอยู่กับว่าคำว่า hello และ world ถูกจับคู่กับจำนวนเต็มอย่างไรซึ่งเป็นปัจจัยกำหนด สำหรับคอลัมน์ x_centered เราจะลบค่าเฉลี่ยออกเพื่อให้ค่าของคอลัมน์ x ซึ่งเป็น [1.0, 2.0, 3.0] กลายเป็น [-1.0, 0.0, 1.0] ในทำนองเดียวกันคอลัมน์ที่เหลือจะตรงกับค่าที่คาดไว้

[{u's_integerized': 0,
  u'x_centered': -1.0,
  u'x_centered_times_y_normalized': -0.0,
  u'y_normalized': 0.0},
 {u's_integerized': 1,
  u'x_centered': 0.0,
  u'x_centered_times_y_normalized': 0.0,
  u'y_normalized': 0.5},
 {u's_integerized': 0,
  u'x_centered': 1.0,
  u'x_centered_times_y_normalized': 1.0,
  u'y_normalized': 1.0}]

ทั้ง raw_data และ transformed_data เป็นชุดข้อมูล สองส่วนถัดไปจะแสดงวิธีการใช้งาน Beam แทนชุดข้อมูลและวิธีอ่านและเขียนข้อมูลลงดิสก์ ค่าส่งคืนอื่น transform_fn แสดงถึงการแปลงที่ใช้กับข้อมูลซึ่งครอบคลุมรายละเอียดด้านล่าง

AnalyzeAndTransformDataset คือองค์ประกอบของการแปลงพื้นฐานสองแบบที่จัดทำโดยการใช้งาน AnalyzeDataset และ TransformDataset ดังนั้นข้อมูลโค้ดสองรายการต่อไปนี้จึงเทียบเท่ากัน:

transformed_data, transform_fn = (
    my_data | tft_beam.AnalyzeAndTransformDataset(preprocessing_fn))
transform_fn = my_data | tft_beam.AnalyzeDataset(preprocessing_fn)
transformed_data = (my_data, transform_fn) | tft_beam.TransformDataset()

transform_fn เป็นฟังก์ชันบริสุทธิ์ที่แสดงถึงการดำเนินการที่ใช้กับแต่ละแถวของชุดข้อมูล โดยเฉพาะอย่างยิ่งค่าตัววิเคราะห์ได้รับการคำนวณและถือว่าเป็นค่าคงที่แล้ว ในตัวอย่าง transform_fn ประกอบด้วยค่าเฉลี่ยของคอลัมน์ x ค่าต่ำสุดและสูงสุดของคอลัมน์ y และคำศัพท์ที่ใช้ในการแมปสตริงกับจำนวนเต็ม

คุณลักษณะที่สำคัญของ tf.Transform คือ transform_fn แสดงถึงแผนที่ ทับแถว ซึ่งเป็นฟังก์ชันบริสุทธิ์ที่ใช้กับแต่ละแถวแยกกัน การคำนวณทั้งหมดสำหรับการรวมแถวจะทำใน AnalyzeDataset นอกจากนี้ transform_fn ยังแสดงเป็น TensorFlow Graph ซึ่งสามารถฝังลงในกราฟการแสดงผล

AnalyzeAndTransformDataset มีไว้สำหรับการเพิ่มประสิทธิภาพในกรณีพิเศษนี้ นี่เป็นรูปแบบเดียวกับที่ใช้ใน scikit-learn โดยมีวิธีการ fit , transform และ fit_transform

รูปแบบข้อมูลและสคีมา

การใช้งาน TFT Beam ยอมรับรูปแบบข้อมูลอินพุตที่แตกต่างกันสองรูปแบบ รูปแบบ "instance dict" (ดังที่เห็นในตัวอย่างด้านบนและใน simple_example.py ) เป็นรูปแบบที่ใช้งานง่ายและเหมาะสำหรับชุดข้อมูลขนาดเล็กในขณะที่รูปแบบ TFXIO ( Apache Arrow ) ให้ประสิทธิภาพที่ดีขึ้นและเหมาะสำหรับชุดข้อมูลขนาดใหญ่

การใช้งาน Beam จะบอกว่ารูปแบบใดที่ PCollection อินพุตจะอยู่ใน "ข้อมูลเมตา" ที่มาพร้อมกับ PCollection:

(raw_data, raw_data_metadata) | tft.AnalyzeDataset(...)
  • หาก raw_data_metadata เป็น dataset_metadata.DatasetMetadata (ดูด้านล่างส่วน "รูปแบบ" instance dict ") ดังนั้น raw_data จะอยู่ในรูปแบบ" instance dict "
  • ถ้า raw_data_metadata เป็น tfxio.TensorAdapterConfig (ดูด้านล่างส่วน "รูปแบบ TFXIO") ดังนั้น raw_data จะอยู่ในรูปแบบ TFXIO

รูปแบบ "instance dict"

ในตัวอย่างโค้ดก่อนหน้านี้โค้ดที่กำหนด raw_data_metadata จะถูกละไว้ ข้อมูลเมตาประกอบด้วยสคีมาที่กำหนดโครงร่างของข้อมูลเพื่อให้อ่านและเขียนเป็นรูปแบบต่างๆ แม้แต่รูปแบบในหน่วยความจำที่แสดงในส่วนสุดท้ายก็ไม่ได้อธิบายตัวเองและต้องใช้สคีมาเพื่อที่จะตีความเป็นเทนเซอร์

นี่คือคำจำกัดความของสคีมาสำหรับข้อมูลตัวอย่าง:

from tensorflow_transform.tf_metadata import dataset_metadata
from tensorflow_transform.tf_metadata import schema_utils

raw_data_metadata = dataset_metadata.DatasetMetadata(
      schema_utils.schema_from_feature_spec({
        's': tf.io.FixedLenFeature([], tf.string),
        'y': tf.io.FixedLenFeature([], tf.float32),
        'x': tf.io.FixedLenFeature([], tf.float32),
    }))

คลาส dataset_schema.Schema มีข้อมูลที่จำเป็นในการแยกวิเคราะห์ข้อมูลจากรูปแบบบนดิสก์หรือในหน่วยความจำลงในเทนเซอร์ โดยทั่วไปจะสร้างโดยการเรียก schema_utils.schema_from_feature_spec ด้วยคีย์คุณลักษณะการแม็ป dict ไปยัง tf.io.FixedLenFeature , tf.io.VarLenFeature และค่า tf.io.SparseFeature ดูเอกสารสำหรับ tf.parse_example สำหรับรายละเอียดเพิ่มเติม

ด้านบนเราใช้ tf.io.FixedLenFeature เพื่อระบุว่าแต่ละคุณลักษณะมีจำนวนค่าคงที่ในกรณีนี้คือค่าสเกลาร์เดียว เนื่องจากอินสแตนซ์แบตช์ tf.Transform Tensor จริงที่แสดงถึงคุณลักษณะจะมีรูปร่าง (None,) โดยที่มิติที่ไม่รู้จักคือมิติชุดงาน

รูปแบบ TFXIO

ด้วยรูปแบบนี้ข้อมูลคาดว่าจะอยู่ใน pyarrow.RecordBatch สำหรับข้อมูลแบบตารางการใช้งาน Apache Beam ของเรายอมรับ Arrow RecordBatch es ที่ประกอบด้วยคอลัมน์ประเภทต่อไปนี้:

  • pa.list_(<primitive>) โดยที่ <primitive> คือ pa.int64() , pa.float32() pa.binary() หรือ pa.large_binary()

  • pa.large_list(<primitive>)

ชุดข้อมูลอินพุตของเล่นที่เราใช้ด้านบนเมื่อแสดงเป็น RecordBatch จะมีลักษณะดังต่อไปนี้:

raw_data = [
    pa.record_batch([
        pa.array([[1], [2], [3]], pa.list_(pa.float32())),
        pa.array([[1], [2], [3]], pa.list_(pa.float32())),
        pa.array([['hello'], ['world'], ['hello']], pa.list_(pa.binary())),
    ], ['x', 'y', 's'])
]

เช่นเดียวกับ DatasetMetadata ที่จำเป็นต้องใช้ในการประกอบกับรูปแบบ "instance dict" จำเป็นต้องใช้ tfxio.TensorAdapterConfig เพื่อใช้กับ RecordBatch es ประกอบด้วย Arrow schema ของ RecordBatch es และ TensorRepresentations เพื่อกำหนดว่าคอลัมน์ใน RecordBatch es สามารถตีความเป็น TensorFlow Tensors ได้อย่างไร (รวมถึง แต่ไม่ จำกัด เฉพาะ tf.Tensor, tf.SparseTensor)

TensorRepresentations คือ Dict[Text, TensorRepresentation] ซึ่งสร้างความสัมพันธ์ระหว่าง Tensor ที่ preprocessing_fn ยอมรับและคอลัมน์ใน RecordBatch es ตัวอย่างเช่น:

tensor_representation = {
    'x': text_format.Parse(
        """dense_tensor { column_name: "col1" shape { dim { size: 2 } } }"""
        schema_pb2.TensorRepresentation())
}

หมายความว่า inputs['x'] ใน preprocessing_fn ควรเป็น tf แบบหนาแน่นซึ่งค่ามาจากคอลัมน์ชื่อ 'col1' ในอินพุต RecordBatch es และรูปร่าง ( RecordBatchRecordBatch ) ควรเป็น [batch_size, 2]

TensorRepresentation คือ Protobuf ที่กำหนดไว้ใน TensorFlow Metadata

อินพุตและเอาต์พุตด้วย Apache Beam

จนถึงตอนนี้เราได้เห็นข้อมูลอินพุตและเอาต์พุตในรายการ python (ของ RecordBatch es หรือพจนานุกรมอินสแตนซ์) นี่คือการทำให้ง่ายขึ้นซึ่งอาศัยความสามารถของ Apache Beam ในการทำงานกับรายการตลอดจนการแสดงข้อมูลหลัก PCollection

PCollection คือการแสดงข้อมูลที่เป็นส่วนหนึ่งของไปป์ไลน์ Beam ไปป์ไลน์ Beam เกิดขึ้นจากการใช้ PTransform s ต่างๆรวมถึง AnalyzeDataset และ TransformDataset และรันไปป์ไลน์ PCollection ไม่ได้สร้างขึ้นในหน่วยความจำของไบนารีหลัก แต่จะกระจายไปยังผู้ปฏิบัติงานแทน (แม้ว่าส่วนนี้จะใช้โหมดการดำเนินการในหน่วยความจำ)

PCollection Sources ( TFXIO ) TFXIO

รูปแบบ RecordBatch ที่การใช้งานของเรายอมรับเป็นรูปแบบทั่วไปที่ไลบรารี TFX อื่นยอมรับ ดังนั้น TFX จึงนำเสนอ "แหล่งข้อมูล" ที่สะดวก (aka TFXIO ) ที่อ่านไฟล์ในรูปแบบต่างๆบนดิสก์และสร้าง RecordBatch es และยังสามารถให้ TensorAdapterConfig รวมถึง TensorRepresentations อนุมานได้

TFXIO เหล่านี้สามารถพบได้ในแพ็คเกจ tfx_bsl ( tfx_bsl.public.tfxio )

ตัวอย่าง: ชุดข้อมูล "รายได้สำมะโนประชากร"

ตัวอย่างต่อไปนี้ต้องการทั้งการอ่านและการเขียนข้อมูลบนดิสก์และการแสดงข้อมูลเป็น PCollection (ไม่ใช่รายการ) โปรดดู: census_example.py ด้านล่างเราจะแสดงวิธีดาวน์โหลดข้อมูลและเรียกใช้ตัวอย่างนี้ ชุดข้อมูล "รายได้สำมะโนประชากร" จัดทำโดย UCI Machine Learning Repository ชุดข้อมูลนี้มีทั้งข้อมูลที่เป็นหมวดหมู่และข้อมูลตัวเลข

ข้อมูลอยู่ในรูปแบบ CSV สองบรรทัดแรกมีดังนี้

39, State-gov, 77516, Bachelors, 13, Never-married, Adm-clerical, Not-in-family, White, Male, 2174, 0, 40, United-States, <=50K
50, Self-emp-not-inc, 83311, Bachelors, 13, Married-civ-spouse, Exec-managerial, Husband, White, Male, 0, 0, 13, United-States, <=50K

คอลัมน์ของชุดข้อมูลมีทั้งแบบหมวดหมู่หรือตัวเลข ชุดข้อมูลนี้อธิบายถึงปัญหาการจัดหมวดหมู่: การคาดการณ์คอลัมน์สุดท้ายที่แต่ละคนมีรายได้มากกว่าหรือน้อยกว่า 50K ต่อปี อย่างไรก็ตามจากมุมมองของ tf.Transform ป้ายกำกับนี้เป็นเพียงคอลัมน์หมวดหมู่อื่น

เราใช้ TFXIO สำเร็จรูป BeamRecordCsvTFXIO เพื่อแปลบรรทัด CSV เป็น RecordBatches TFXIO ต้องการข้อมูลสำคัญสองส่วน:

  • TensorFlow Metadata Schema ที่มีข้อมูลประเภทและรูปร่างเกี่ยวกับคอลัมน์ CSV แต่ละคอลัมน์ TensorRepresentation s เป็นส่วนเสริมของ Schema หากไม่ได้ระบุไว้ (ซึ่งเป็นกรณีในตัวอย่างนี้) ข้อมูลเหล่านี้จะสรุปได้จากข้อมูลประเภทและรูปร่าง เราสามารถรับ Schema ได้โดยใช้ฟังก์ชันตัวช่วยที่เรามีให้เพื่อแปลจากข้อกำหนดการแยกวิเคราะห์ TF (แสดงในตัวอย่างนี้) หรือโดยการเรียกใช้ TensorFlow Data Validation

  • รายชื่อคอลัมน์ตามลำดับที่ปรากฏในไฟล์ CSV โปรดทราบว่าชื่อเหล่านั้นต้องตรงกับชื่อคุณลักษณะในสคีมา

ในตัวอย่างนี้เราอนุญาตให้ไม่มีคุณลักษณะ education-num ซึ่งหมายความว่าจะแสดงเป็น tf.io.VarLenFeature ใน feature_spec และเป็น tf.SparseTensor ใน preprocessing_fn คุณสมบัติอื่น ๆ จะกลายเป็น tf.Tensor ที่มีชื่อเดียวกันใน preprocessing_fn

csv_tfxio = tfxio.BeamRecordCsvTFXIO(
    physical_format='text', column_names=ordered_columns, schema=SCHEMA)

record_batches = (
    p
    | 'ReadTrainData' >> textio.ReadFromText(train_data_file)
    | ...  # fix up csv lines
    | 'ToRecordBatches' >> csv_tfxio.BeamSource())

tensor_adapter_config = csv_tfxio.TensorAdapterConfig()

โปรดทราบว่าเราต้องทำการแก้ไขเพิ่มเติมหลังจากอ่านบรรทัด CSV แล้วมิฉะนั้นเราสามารถพึ่งพา CsvTFXIO เพื่อจัดการทั้งการอ่านไฟล์และแปลเป็น RecordBatch es:

csv_tfxio = tfxio.CsvTFXIO(train_data_file, column_name=ordered_columns,
                           schema=SCHEMA)
record_batches = p | 'TFXIORead' >> csv_tfxio.BeamSource()
tensor_adapter_config = csv_tfxio.TensorAdapterConfig()

การประมวลผลล่วงหน้าจะคล้ายกับตัวอย่างก่อนหน้านี้ยกเว้นฟังก์ชันก่อนการประมวลผลจะถูกสร้างขึ้นโดยโปรแกรมแทนที่จะระบุแต่ละคอลัมน์ด้วยตนเอง ในฟังก์ชันก่อนการประมวลผลด้านล่าง NUMERICAL_COLUMNS และ CATEGORICAL_COLUMNS เป็นรายการที่มีชื่อของคอลัมน์ตัวเลขและหมวดหมู่:

def preprocessing_fn(inputs):
  """Preprocess input columns into transformed columns."""
  # Since we are modifying some features and leaving others unchanged, we
  # start by setting `outputs` to a copy of `inputs.
  outputs = inputs.copy()

  # Scale numeric columns to have range [0, 1].
  for key in NUMERIC_FEATURE_KEYS:
    outputs[key] = tft.scale_to_0_1(outputs[key])

  for key in OPTIONAL_NUMERIC_FEATURE_KEYS:
    # This is a SparseTensor because it is optional. Here we fill in a default
    # value when it is missing.
      sparse = tf.sparse.SparseTensor(outputs[key].indices, outputs[key].values,
                                      [outputs[key].dense_shape[0], 1])
      dense = tf.sparse.to_dense(sp_input=sparse, default_value=0.)
    # Reshaping from a batch of vectors of size 1 to a batch to scalars.
    dense = tf.squeeze(dense, axis=1)
    outputs[key] = tft.scale_to_0_1(dense)

  # For all categorical columns except the label column, we generate a
  # vocabulary but do not modify the feature.  This vocabulary is instead
  # used in the trainer, by means of a feature column, to convert the feature
  # from a string to an integer id.
  for key in CATEGORICAL_FEATURE_KEYS:
    tft.vocabulary(inputs[key], vocab_filename=key)

  # For the label column we provide the mapping from string to index.
  initializer = tf.lookup.KeyValueTensorInitializer(
      keys=['>50K', '<=50K'],
      values=tf.cast(tf.range(2), tf.int64),
      key_dtype=tf.string,
      value_dtype=tf.int64)
  table = tf.lookup.StaticHashTable(initializer, default_value=-1)

  outputs[LABEL_KEY] = table.lookup(outputs[LABEL_KEY])

  return outputs

ความแตกต่างอย่างหนึ่งจากตัวอย่างก่อนหน้านี้คือคอลัมน์เลเบลระบุการแมปจากสตริงไปยังดัชนีด้วยตนเอง ดังนั้น '>50' จึงถูกจับคู่กับ 0 และ '<=50K' ถูกจับคู่กับ 1 เนื่องจากมีประโยชน์ในการทราบว่าดัชนีใดในแบบจำลองที่ได้รับการฝึกอบรมตรงกับป้ายกำกับใด

ตัวแปร record_batches แสดงถึง PCollection ของ pyarrow.RecordBatch es tensor_adapter_config กำหนดโดย csv_tfxio ซึ่งอนุมานจาก SCHEMA (และท้ายที่สุดในตัวอย่างนี้จากข้อกำหนดการแยกวิเคราะห์ TF)

ขั้นตอนสุดท้ายคือการเขียนข้อมูลที่แปลงแล้วลงในดิสก์และมีรูปแบบคล้ายกับการอ่านข้อมูลดิบ สคีมาที่ใช้ในการทำสิ่งนี้เป็นส่วนหนึ่งของเอาต์พุตของ AnalyzeAndTransformDataset ซึ่งอนุมาน schema สำหรับข้อมูลเอาต์พุต โค้ดที่จะเขียนลงดิสก์แสดงด้านล่าง สคีมาเป็นส่วนหนึ่งของข้อมูลเมตา แต่ใช้ทั้งสองอย่างสลับกันใน tf.Transform API (เช่นส่งข้อมูลเมตาไปยัง ExampleProtoCoder ) โปรดทราบว่าสิ่งนี้เขียนในรูปแบบอื่น แทนที่จะใช้ textio.WriteToText ให้ใช้การสนับสนุนในตัวของ Beam สำหรับรูปแบบ TFRecord และใช้ coder เพื่อเข้ารหัสข้อมูลเป็น Example protos นี่เป็นรูปแบบที่ดีกว่าที่จะใช้สำหรับการฝึกอบรมดังแสดงในหัวข้อถัดไป transformed_eval_data_base จัดเตรียมชื่อไฟล์พื้นฐานสำหรับแต่ละชาร์ดที่เขียน

transformed_data | "WriteTrainData" >> tfrecordio.WriteToTFRecord(
    transformed_eval_data_base,
    coder=tft.coders.ExampleProtoCoder(transformed_metadata))

นอกเหนือจากข้อมูลการฝึกอบรมแล้ว transform_fn ยังเขียนด้วยข้อมูลเมตา:

_ = (
    transform_fn
    | 'WriteTransformFn' >> tft_beam.WriteTransformFn(working_dir))
transformed_metadata | 'WriteMetadata' >> tft_beam.WriteMetadata(
    transformed_metadata_file, pipeline=p)

รัน Beam pipeline ทั้งหมดด้วย p.run().wait_until_finish() จนถึงจุดนี้ไปป์ไลน์ Beam แสดงการคำนวณแบบกระจายที่รอการตัดบัญชี มีคำแนะนำสำหรับสิ่งที่จะทำ แต่คำแนะนำยังไม่ถูกดำเนินการ การเรียกครั้งสุดท้ายนี้ดำเนินการไปป์ไลน์ที่ระบุ

ดาวน์โหลดชุดข้อมูลสำมะโนประชากร

ดาวน์โหลดชุดข้อมูลสำมะโนประชากรโดยใช้คำสั่งเชลล์ต่อไปนี้:

  wget https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.data
  wget https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.test

เมื่อรันสคริปต์ census_example.py ให้ส่งผ่านไดเร็กทอรีที่มีข้อมูลนี้เป็นอาร์กิวเมนต์แรก สคริปต์สร้างไดเร็กทอรีย่อยชั่วคราวเพื่อเพิ่มข้อมูลที่ประมวลผลล่วงหน้า

ผสานรวมกับการฝึกอบรม TensorFlow

ส่วนสุดท้ายของ census_example.py แสดงวิธีใช้ข้อมูลที่ประมวลผลล่วงหน้าเพื่อฝึกโมเดล ดูรายละเอียดใน เอกสารเครื่องมือประมาณการ ขั้นตอนแรกคือการสร้าง Estimator ซึ่งต้องมีคำอธิบายของคอลัมน์ที่ประมวลผลล่วงหน้า คอลัมน์ตัวเลขแต่ละคอลัมน์อธิบายว่าเป็น real_valued_column ซึ่งเป็นกระดาษห่อหุ้มรอบเวกเตอร์หนาแน่นที่มีขนาดคงที่ ( 1 ในตัวอย่างนี้) แต่ละคอลัมน์ที่จัดหมวดหมู่ถูกจับคู่จากสตริงเป็นจำนวนเต็มจากนั้นจะส่งผ่านไปยัง indicator_column tft.TFTransformOutput ใช้ในการค้นหาเส้นทางไฟล์คำศัพท์สำหรับแต่ละคุณลักษณะที่เป็นหมวดหมู่

real_valued_columns = [feature_column.real_valued_column(key)
                       for key in NUMERIC_FEATURE_KEYS]

one_hot_columns = [
    tf.feature_column.indicator_column(
        tf.feature_column.categorical_column_with_vocabulary_file(
            key=key,
            vocabulary_file=tf_transform_output.vocabulary_file_by_name(
                vocab_filename=key)))
    for key in CATEGORICAL_FEATURE_KEYS]

estimator = tf.estimator.LinearClassifier(real_valued_columns + one_hot_columns)

ขั้นตอนต่อไปคือการสร้างตัวสร้างเพื่อสร้างฟังก์ชันอินพุตสำหรับการฝึกอบรมและการประเมินผล ความแตกต่างจากการฝึกอบรมที่ใช้โดย tf.Learn เนื่องจากไม่จำเป็นต้องใช้คุณลักษณะเฉพาะเพื่อแยกวิเคราะห์ข้อมูลที่แปลงแล้ว ให้ใช้ข้อมูลเมตาสำหรับข้อมูลที่แปลงแล้วเพื่อสร้างคุณลักษณะเฉพาะ

def _make_training_input_fn(tf_transform_output, transformed_examples,
                            batch_size):
  ...
  def input_fn():
    """Input function for training and eval."""
    dataset = tf.data.experimental.make_batched_features_dataset(
        ..., tf_transform_output.transformed_feature_spec(), ...)

    transformed_features = tf.compat.v1.data.make_one_shot_iterator(
        dataset).get_next()
    ...

  return input_fn

โค้ดที่เหลือจะเหมือนกับการใช้คลาส Estimator ตัวอย่างนี้ยังมีโค้ดสำหรับเอ็กซ์พอร์ตโมเดลในรูปแบบ SavedModel Tensorflow Serving หรือ Cloud ML Engine สามารถใช้โมเดลที่ส่งออกได้