เข้าร่วมชุมชน SIG TFX-Addons และช่วยปรับปรุง TFX ให้ดียิ่งขึ้น!

เริ่มต้นกับ TensorFlow Transform

คู่มือนี้แนะนำแนวคิดพื้นฐานของ tf.Transform และวิธีการใช้งาน มันจะ:

  • กำหนด ฟังก์ชันก่อนการประมวลผล คำอธิบายเชิงตรรกะของไปป์ไลน์ที่แปลงข้อมูลดิบเป็นข้อมูลที่ใช้ในการฝึกโมเดลแมชชีนเลิร์นนิง
  • แสดงการใช้งาน Apache Beam ที่ ใช้ในการแปลงข้อมูลโดยการแปลง ฟังก์ชันก่อน การ ประมวลผล เป็น Beam pipeline
  • แสดงตัวอย่างการใช้งานเพิ่มเติม

กำหนดฟังก์ชันก่อนการประมวลผล

ฟังก์ชันก่อน การ ประมวลผล เป็นแนวคิดที่สำคัญที่สุดของ tf.Transform ฟังก์ชันก่อนการประมวลผลเป็นคำอธิบายเชิงตรรกะของการเปลี่ยนแปลงของชุดข้อมูล ฟังก์ชั่น preprocessing ยอมรับและผลตอบแทนในพจนานุกรมของเทนเซอร์ที่เมตริกซ์หมายถึง Tensor หรือ SparseTensor มีฟังก์ชันสองประเภทที่ใช้ในการกำหนดฟังก์ชันก่อนการประมวลผล:

  1. ฟังก์ชันใด ๆ ที่ยอมรับและส่งคืนค่าเทนเซอร์ สิ่งเหล่านี้เพิ่มการดำเนินการ TensorFlow ให้กับกราฟที่แปลงข้อมูลดิบเป็นข้อมูลที่แปลงแล้ว
  2. เครื่องวิเคราะห์ ใด ๆ ที่จัดทำโดย tf.Transform เครื่องวิเคราะห์ยังยอมรับและส่งคืนเทนเซอร์ แต่ต่างจากฟังก์ชัน TensorFlow คือ ไม่ เพิ่มการดำเนินการลงในกราฟ ตัววิเคราะห์ทำให้เกิด tf.Transform แทนแทนเพื่อคำนวณการดำเนินการแบบเต็มผ่านภายนอก TensorFlow พวกเขาใช้ค่าเทนเซอร์อินพุตเหนือชุดข้อมูลทั้งหมดเพื่อสร้างค่าเทนเซอร์คงที่ที่ส่งคืนเป็นเอาต์พุต ตัวอย่างเช่น tft.min คำนวณค่าต่ำสุดของเทนเซอร์บนชุดข้อมูล tf.Transform มีชุดเครื่องมือวิเคราะห์ที่ตายตัว แต่จะขยายออกไปในเวอร์ชันต่อ ๆ ไป

ตัวอย่างฟังก์ชันการประมวลผลล่วงหน้า

ด้วยการรวมตัววิเคราะห์และฟังก์ชัน TensorFlow ปกติผู้ใช้สามารถสร้างไปป์ไลน์ที่ยืดหยุ่นสำหรับการแปลงข้อมูล ฟังก์ชันก่อนการประมวลผลต่อไปนี้จะแปลงคุณสมบัติทั้งสามอย่างแตกต่างกันและรวมคุณสมบัติสองอย่างเข้าด้วยกัน:

import tensorflow as tf
import tensorflow_transform as tft
import tensorflow_transform.beam as tft_beam

def preprocessing_fn(inputs):
  x = inputs['x']
  y = inputs['y']
  s = inputs['s']
  x_centered = x - tft.mean(x)
  y_normalized = tft.scale_to_0_1(y)
  s_integerized = tft.compute_and_apply_vocabulary(s)
  x_centered_times_y_normalized = x_centered * y_normalized
  return {
      'x_centered': x_centered,
      'y_normalized': y_normalized,
      'x_centered_times_y_normalized': x_centered_times_y_normalized,
      's_integerized': s_integerized
  }

ในที่นี้ x , y และ s คือ Tensor s ที่แสดงถึงคุณสมบัติการป้อนข้อมูล เทนเซอร์ใหม่ตัวแรกที่สร้างขึ้น x_centered สร้างขึ้นโดยใช้ tft.mean กับ x และลบค่านี้ออกจาก x tft.mean(x) ส่งกลับค่าเทนเซอร์แทนค่าเฉลี่ยของเทนเซอร์ x x_centered คือเทนเซอร์ x มีค่าเฉลี่ยลบ

เทนเซอร์ใหม่ตัวที่สอง y_normalized ถูกสร้างขึ้นในลักษณะที่คล้ายกัน แต่ใช้วิธีอำนวยความสะดวก tft.scale_to_0_1 วิธีนี้ทำสิ่งที่คล้ายกับการคำนวณ x_centered กล่าวคือคำนวณค่าสูงสุดและต่ำสุดและใช้สิ่งเหล่านี้เพื่อปรับขนาด y

เทนเซอร์ s_integerized แสดงตัวอย่างของการจัดการสตริง ในกรณีนี้เราใช้สตริงและแมปกับจำนวนเต็ม สิ่งนี้ใช้ฟังก์ชันอำนวยความสะดวกtft.compute_and_apply_vocabulary ฟังก์ชันนี้ใช้ตัววิเคราะห์เพื่อคำนวณค่าเฉพาะที่นำมาจากสตริงอินพุตจากนั้นใช้การดำเนินการ TensorFlow เพื่อแปลงสตริงอินพุตเป็นดัชนีในตารางของค่าที่ไม่ซ้ำกัน

คอลัมน์สุดท้ายแสดงให้เห็นว่าสามารถใช้การดำเนินการ TensorFlow เพื่อสร้างคุณลักษณะใหม่โดยการรวมเทนเซอร์

ฟังก์ชันก่อนการประมวลผลกำหนดไปป์ไลน์ของการดำเนินการบนชุดข้อมูล ในการใช้ไปป์ไลน์เราต้องอาศัยการใช้ tf.Transform API อย่างเป็นรูปธรรม การใช้งาน Apache Beam ให้ PTransform ซึ่งใช้ฟังก์ชันการประมวลผลล่วงหน้าของผู้ใช้กับข้อมูล เวิร์กโฟลว์ทั่วไปของผู้ใช้ tf.Transform จะสร้างฟังก์ชันก่อนการประมวลผลจากนั้นรวมสิ่งนี้เข้ากับไปป์ไลน์ Beam ขนาดใหญ่สร้างข้อมูลสำหรับการฝึกอบรม

การผสม

Batching เป็นส่วนสำคัญของ TensorFlow เนื่องจากเป้าหมายประการหนึ่งของ tf.Transform คือการจัดเตรียมกราฟ TensorFlow สำหรับการประมวลผลล่วงหน้าที่สามารถรวมเข้ากับกราฟการแสดงผล (และเป็นทางเลือกของกราฟการฝึกอบรม) การทำแบตช์ก็เป็นแนวคิดที่สำคัญเช่นกันใน tf.Transform

แม้ว่าจะไม่ชัดเจนในตัวอย่างข้างต้น แต่ฟังก์ชันการประมวลผลล่วงหน้าที่ผู้ใช้กำหนดจะถูกส่งผ่านเทนเซอร์ที่เป็นตัวแทนของ แบทช์ ไม่ใช่แต่ละอินสแตนซ์เช่นเดียวกับที่เกิดขึ้นระหว่างการฝึกอบรมและการให้บริการกับ TensorFlow ในทางกลับกันนักวิเคราะห์จะทำการคำนวณในชุดข้อมูลทั้งหมดที่ส่งคืนค่าเดียวไม่ใช่ชุดของค่า x คือ Tensor มีรูปร่างเป็น (batch_size,) ในขณะที่ tft.mean(x) คือ Tensor มีรูปร่างเป็น () การลบ x - tft.mean(x) ออกอากาศโดยที่ค่าของ tft.mean(x) ถูกลบออกจากทุกองค์ประกอบของแบทช์ที่แสดงด้วย x

การติดตั้ง Apache Beam

ในขณะที่ ฟังก์ชันการประมวลผลล่วงหน้า มีจุดมุ่งหมายเพื่อเป็นคำอธิบายเชิงตรรกะของ ไปป์ไลน์การประมวลผลล่วงหน้าที่ ใช้กับเฟรมเวิร์กการประมวลผลข้อมูลหลายเฟรม tf.Transform ให้การใช้งานแบบบัญญัติที่ใช้กับ Apache Beam การใช้งานนี้แสดงให้เห็นถึงฟังก์ชันการทำงานที่จำเป็นจากการนำไปใช้งาน ไม่มี API ที่เป็นทางการสำหรับฟังก์ชันนี้ดังนั้นการใช้งานแต่ละครั้งจึงสามารถใช้ API ที่เป็นสำนวนสำหรับกรอบการประมวลผลข้อมูลโดยเฉพาะ

การใช้งาน Apache Beam มี PTransform สอง PTransform ที่ใช้ในการประมวลผลข้อมูลสำหรับฟังก์ชันก่อนการประมวลผล ต่อไปนี้แสดงการใช้งานสำหรับ PTransform AnalyzeAndTransformDataset :

raw_data = [
    {'x': 1, 'y': 1, 's': 'hello'},
    {'x': 2, 'y': 2, 's': 'world'},
    {'x': 3, 'y': 3, 's': 'hello'}
]

raw_data_metadata = ...
transformed_dataset, transform_fn = (
    (raw_data, raw_data_metadata) | tft_beam.AnalyzeAndTransformDataset(
        preprocessing_fn))
transformed_data, transformed_metadata = transformed_dataset

เนื้อหา transformed_data แสดงอยู่ด้านล่างและมีคอลัมน์ที่ถูกแปลงในรูปแบบเดียวกับข้อมูลดิบ โดยเฉพาะอย่างยิ่งค่าของ s_integerized คือ [0, 1, 0] - ค่าเหล่านี้ขึ้นอยู่กับว่าคำว่า hello และ world ถูกจับคู่กับจำนวนเต็มอย่างไรซึ่งเป็นปัจจัยกำหนด สำหรับคอลัมน์ x_centered เราจะลบค่าเฉลี่ยออกเพื่อให้ค่าของคอลัมน์ x ซึ่งเป็น [1.0, 2.0, 3.0] กลายเป็น [-1.0, 0.0, 1.0] ในทำนองเดียวกันคอลัมน์ที่เหลือจะตรงกับค่าที่คาดไว้

[{u's_integerized': 0,
  u'x_centered': -1.0,
  u'x_centered_times_y_normalized': -0.0,
  u'y_normalized': 0.0},
 {u's_integerized': 1,
  u'x_centered': 0.0,
  u'x_centered_times_y_normalized': 0.0,
  u'y_normalized': 0.5},
 {u's_integerized': 0,
  u'x_centered': 1.0,
  u'x_centered_times_y_normalized': 1.0,
  u'y_normalized': 1.0}]

ทั้ง raw_data และ transformed_data เป็นชุดข้อมูล สองส่วนถัดไปจะแสดงวิธีการใช้งาน Beam แทนชุดข้อมูลและวิธีการอ่านและเขียนข้อมูลลงดิสก์ ค่าส่งคืนอื่น transform_fn แสดงถึงการเปลี่ยนแปลงที่ใช้กับข้อมูลซึ่งครอบคลุมในรายละเอียดด้านล่าง

AnalyzeAndTransformDataset คือองค์ประกอบของการแปลงพื้นฐานสองแบบที่จัดทำโดยการใช้งาน AnalyzeDataset และ TransformDataset ดังนั้นข้อมูลโค้ดสองรายการต่อไปนี้จึงเทียบเท่ากัน:

transformed_data, transform_fn = (
    my_data | tft_beam.AnalyzeAndTransformDataset(preprocessing_fn))
transform_fn = my_data | tft_beam.AnalyzeDataset(preprocessing_fn)
transformed_data = (my_data, transform_fn) | tft_beam.TransformDataset()

transform_fn เป็นฟังก์ชันบริสุทธิ์ที่แสดงถึงการดำเนินการที่นำไปใช้กับแต่ละแถวของชุดข้อมูล โดยเฉพาะอย่างยิ่งค่าตัววิเคราะห์ได้รับการคำนวณและถือว่าเป็นค่าคงที่แล้ว ในตัวอย่าง transform_fn ประกอบด้วยค่าเฉลี่ยของคอลัมน์ x ค่าต่ำสุดและสูงสุดของคอลัมน์ y และคำศัพท์ที่ใช้ในการแมปสตริงกับจำนวนเต็ม

คุณสมบัติที่สำคัญของ tf.Transform คือ transform_fn แสดงถึงแผนที่ ทับแถว ซึ่งเป็นฟังก์ชันบริสุทธิ์ที่ใช้กับแต่ละแถวแยกกัน การคำนวณทั้งหมดสำหรับการรวมแถวทำได้ใน AnalyzeDataset นอกจากนี้ transform_fn ยังแสดงเป็น TensorFlow Graph ซึ่งสามารถฝังลงในกราฟการแสดงผลได้

AnalyzeAndTransformDataset มีไว้สำหรับการเพิ่มประสิทธิภาพในกรณีพิเศษนี้ นี่เป็นรูปแบบเดียวกับที่ใช้ใน scikit-learn โดยมีวิธีการ fit , transform และ fit_transform

รูปแบบข้อมูลและสคีมา

การใช้งาน TFT Beam ยอมรับรูปแบบข้อมูลอินพุตที่แตกต่างกันสองรูปแบบ รูปแบบ "instance dict" (ดังที่เห็นในตัวอย่างด้านบนและ simple_example.py ) เป็นรูปแบบที่ใช้งานง่ายและเหมาะสำหรับชุดข้อมูลขนาดเล็กในขณะที่รูปแบบ TFXIO ( Apache Arrow ) ให้ประสิทธิภาพที่ดีขึ้นและเหมาะสำหรับชุดข้อมูลขนาดใหญ่

การใช้งาน Beam จะบอกว่ารูปแบบใดที่ PCollection อินพุตจะอยู่ใน "ข้อมูลเมตา" ที่มาพร้อมกับ PCollection:

(raw_data, raw_data_metadata) | tft.AnalyzeDataset(...)
  • ถ้า raw_data_metadata เป็น dataset_metadata.DatasetMetadata (ดูด้านล่างส่วน "รูปแบบ" instance dict ") ดังนั้น raw_data จะอยู่ในรูปแบบ" instance dict "
  • ถ้า raw_data_metadata เป็น tfxio.TensorAdapterConfig (ดูด้านล่างส่วน "รูปแบบ TFXIO") ดังนั้น raw_data ควรอยู่ในรูปแบบ TFXIO

รูปแบบ "instance dict"

ในตัวอย่างโค้ดก่อนหน้านี้โค้ดที่กำหนด raw_data_metadata จะถูกละไว้ ข้อมูลเมตาประกอบด้วยสคีมาที่กำหนดโครงร่างของข้อมูลเพื่อให้อ่านและเขียนเป็นรูปแบบต่างๆ แม้แต่รูปแบบในหน่วยความจำที่แสดงในส่วนสุดท้ายก็ไม่ได้อธิบายตัวเองและต้องใช้สคีมาเพื่อที่จะตีความเป็นเทนเซอร์

นี่คือคำจำกัดความของสคีมาสำหรับข้อมูลตัวอย่าง:

from tensorflow_transform.tf_metadata import dataset_metadata
from tensorflow_transform.tf_metadata import schema_utils

raw_data_metadata = dataset_metadata.DatasetMetadata(
      schema_utils.schema_from_feature_spec({
        's': tf.io.FixedLenFeature([], tf.string),
        'y': tf.io.FixedLenFeature([], tf.float32),
        'x': tf.io.FixedLenFeature([], tf.float32),
    }))

โปรโตของ Schema มีข้อมูลที่จำเป็นในการแยกวิเคราะห์ข้อมูลจากรูปแบบ on-disk หรือในหน่วยความจำเป็นเทนเซอร์ โดยทั่วไปจะสร้างขึ้นโดยการเรียก schema_utils.schema_from_feature_spec ด้วยคีย์คุณลักษณะการแม็ป dict เป็น tf.io.FixedLenFeature , tf.io.VarLenFeature และค่า tf.io.SparseFeature ดูเอกสารสำหรับ tf.parse_example สำหรับรายละเอียดเพิ่มเติม

ด้านบนเราใช้ tf.io.FixedLenFeature เพื่อระบุว่าแต่ละคุณลักษณะมีจำนวนค่าคงที่ในกรณีนี้คือค่าสเกลาร์เดียว เนื่องจากอินสแตนซ์แบตช์ tf.Transform Tensor จริงที่แสดงถึงคุณลักษณะจะมีรูปร่าง (None,) โดยที่มิติที่ไม่รู้จักคือมิติชุดงาน

รูปแบบ TFXIO

ด้วยรูปแบบนี้ข้อมูลคาดว่าจะอยู่ใน pyarrow.RecordBatch สำหรับข้อมูลแบบตารางการใช้งาน Apache Beam ของเรายอมรับ Arrow RecordBatch es ที่ประกอบด้วยคอลัมน์ประเภทต่อไปนี้:

  • pa.list_(<primitive>) โดยที่ <primitive> คือ pa.int64() , pa.float32() pa.binary() หรือ pa.large_binary()

  • pa.large_list(<primitive>)

ชุดข้อมูลอินพุตของเล่นที่เราใช้ข้างต้นเมื่อแสดงเป็น RecordBatch จะมีลักษณะดังต่อไปนี้:

raw_data = [
    pa.record_batch([
        pa.array([[1], [2], [3]], pa.list_(pa.float32())),
        pa.array([[1], [2], [3]], pa.list_(pa.float32())),
        pa.array([['hello'], ['world'], ['hello']], pa.list_(pa.binary())),
    ], ['x', 'y', 's'])
]

เช่นเดียวกับ DatasetMetadata ที่จำเป็นต้องใช้ในการมาพร้อมกับรูปแบบ "instance dict" จำเป็นต้องใช้ tfxio.TensorAdapterConfig เพื่อมาพร้อมกับ RecordBatch es ประกอบด้วย Arrow schema ของ RecordBatch es และ TensorRepresentations เพื่อกำหนดว่าคอลัมน์ใน RecordBatch es สามารถตีความเป็น TensorFlow Tensors ได้อย่างไร (รวมถึง แต่ไม่ จำกัด เฉพาะ tf.Tensor, tf.SparseTensor)

TensorRepresentations คือ Dict[Text, TensorRepresentation] ซึ่งสร้างความสัมพันธ์ระหว่าง Tensor ที่ preprocessing_fn ยอมรับและคอลัมน์ใน RecordBatch es ตัวอย่างเช่น:

tensor_representation = {
    'x': text_format.Parse(
        """dense_tensor { column_name: "col1" shape { dim { size: 2 } } }"""
        schema_pb2.TensorRepresentation())
}

หมายความว่า inputs['x'] ใน preprocessing_fn ควรเป็น tf แบบหนาแน่นซึ่งค่ามาจากคอลัมน์ชื่อ 'col1' ในอินพุต RecordBatch es และรูปร่าง ( RecordBatchRecordBatch ) ควรเป็น [batch_size, 2]

TensorRepresentation คือ Protobuf ที่กำหนดไว้ใน TensorFlow Metadata

ความเข้ากันได้กับ TensorFlow

tf.Transform ให้การสนับสนุนสำหรับการส่งออก transform_fn ด้านบนไม่ว่าจะเป็น TF 1.x หรือ TF 2.x SavedModel พฤติกรรมเริ่มต้นก่อนรีลีส 0.30 ส่งออก TF 1.x SavedModel เริ่มต้นด้วยรีลีส 0.30 พฤติกรรมเริ่มต้นคือการเอ็กซ์พอร์ต TF 2.x SavedModel เว้นแต่พฤติกรรม TF 2.x จะถูกปิดใช้งานอย่างชัดเจน (โดยการเรียก tf.compat.v1.disable_v2_behavior() เป็นต้น)

หากใช้แนวคิด TF 1.x เช่น Estimators และ Sessions คุณสามารถคงพฤติกรรมก่อนหน้านี้ได้โดยส่ง force_tf_compat_v1=True ไปยัง tft_beam.Context ถ้าใช้ tf.Transform เป็นไลบรารีแบบสแตนด์อโลนหรือไปยังคอมโพเนนต์ Transform ใน TFX

เมื่อเอ็กซ์พอร์ต transform_fn เป็น TF 2.x SavedModel คาดว่า preprocessing_fn จะสามารถตรวจสอบย้อนกลับได้โดยใช้ tf.function นอกจากนี้หากเรียกใช้ไปป์ไลน์ของคุณจากระยะไกล (ตัวอย่างเช่นด้วย DataflowRunner ) ตรวจสอบให้แน่ใจว่า preprocessing_fn และการอ้างอิงใด ๆ ได้รับการบรรจุอย่างถูกต้องตามที่อธิบายไว้ ที่นี่

ปัญหาที่ทราบเกี่ยวกับการใช้ tf.Transform เพื่อส่งออก TF 2.x SavedModel ได้รับการบันทึกไว้ ที่นี่

อินพุตและเอาต์พุตด้วย Apache Beam

จนถึงตอนนี้เราได้เห็นข้อมูลอินพุตและเอาต์พุตในรายการ python (ของ RecordBatch es หรือพจนานุกรมอินสแตนซ์) นี่คือการทำให้ง่ายขึ้นซึ่งอาศัยความสามารถของ Apache Beam ในการทำงานกับรายการตลอดจนการแสดงข้อมูลหลัก PCollection

PCollection คือการแสดงข้อมูลที่เป็นส่วนหนึ่งของไปป์ไลน์ Beam ไปป์ไลน์ Beam เกิดจากการใช้ PTransform s ต่างๆรวมถึง AnalyzeDataset และ TransformDataset และรันไปป์ไลน์ PCollection ไม่ได้สร้างขึ้นในหน่วยความจำของไบนารีหลัก แต่จะกระจายไปยังผู้ปฏิบัติงานแทน (แม้ว่าส่วนนี้จะใช้โหมดการดำเนินการในหน่วยความจำ)

แหล่งที่มาของ PCollection ก่อนบรรจุกระป๋อง ( TFXIO )

รูปแบบ RecordBatch ที่การใช้งานของเรายอมรับเป็นรูปแบบทั่วไปที่ไลบรารี TFX อื่นยอมรับ ดังนั้น TFX จึงนำเสนอ "แหล่งที่มา" ที่สะดวก (aka TFXIO ) ที่อ่านไฟล์ในรูปแบบต่างๆบนดิสก์และสร้าง RecordBatch es และยังสามารถให้ TensorAdapterConfig รวมถึง TensorRepresentations อนุมานได้

TFXIO เหล่านี้สามารถพบได้ในแพ็คเกจ tfx_bsl ( tfx_bsl.public.tfxio )

ตัวอย่าง: ชุดข้อมูล "รายได้สำมะโนประชากร"

ตัวอย่างต่อไปนี้ต้องการทั้งการอ่านและการเขียนข้อมูลบนดิสก์และการแสดงข้อมูลเป็น PCollection (ไม่ใช่รายการ) โปรดดู: census_example.py ด้านล่างนี้เราจะแสดงวิธีดาวน์โหลดข้อมูลและเรียกใช้ตัวอย่างนี้ ชุดข้อมูล "รายได้สำมะโนประชากร" จัดทำโดย UCI Machine Learning Repository ชุดข้อมูลนี้มีทั้งข้อมูลที่เป็นหมวดหมู่และข้อมูลตัวเลข

ข้อมูลอยู่ในรูปแบบ CSV สองบรรทัดแรกมีดังนี้

39, State-gov, 77516, Bachelors, 13, Never-married, Adm-clerical, Not-in-family, White, Male, 2174, 0, 40, United-States, <=50K
50, Self-emp-not-inc, 83311, Bachelors, 13, Married-civ-spouse, Exec-managerial, Husband, White, Male, 0, 0, 13, United-States, <=50K

คอลัมน์ของชุดข้อมูลมีทั้งแบบหมวดหมู่หรือตัวเลข ชุดข้อมูลนี้อธิบายถึงปัญหาการจัดหมวดหมู่: การคาดการณ์คอลัมน์สุดท้ายที่แต่ละคนมีรายได้มากกว่าหรือน้อยกว่า 50K ต่อปี อย่างไรก็ตามจากมุมมองของ tf.Transform ป้ายกำกับนี้เป็นเพียงคอลัมน์หมวดหมู่อื่น

เราใช้ TFXIO สำเร็จรูป BeamRecordCsvTFXIO เพื่อแปลบรรทัด CSV เป็น RecordBatches TFXIO ต้องการข้อมูลสำคัญสองส่วน:

  • TensorFlow Metadata Schema ที่มีข้อมูลประเภทและรูปร่างเกี่ยวกับคอลัมน์ CSV แต่ละคอลัมน์ TensorRepresentation s เป็นส่วนเสริมของ Schema หากไม่ได้ระบุไว้ (ซึ่งเป็นกรณีในตัวอย่างนี้) ข้อมูลเหล่านี้จะสรุปได้จากข้อมูลประเภทและรูปร่าง เราสามารถรับ Schema ได้โดยใช้ฟังก์ชันตัวช่วยที่เรามีให้เพื่อแปลจากข้อกำหนดการแยกวิเคราะห์ TF (แสดงในตัวอย่างนี้) หรือโดยการเรียกใช้ TensorFlow Data Validation

  • รายชื่อคอลัมน์ตามลำดับที่ปรากฏในไฟล์ CSV โปรดทราบว่าชื่อเหล่านั้นต้องตรงกับชื่อคุณลักษณะในสคีมา

ในตัวอย่างนี้เราอนุญาตให้ไม่มีคุณลักษณะ education-num ซึ่งหมายความว่าจะแสดงเป็น tf.io.VarLenFeature ใน feature_spec และเป็น tf.SparseTensor ใน preprocessing_fn คุณสมบัติอื่น ๆ จะกลายเป็น tf.Tensor ที่มีชื่อเดียวกันใน preprocessing_fn

csv_tfxio = tfxio.BeamRecordCsvTFXIO(
    physical_format='text', column_names=ordered_columns, schema=SCHEMA)

record_batches = (
    p
    | 'ReadTrainData' >> textio.ReadFromText(train_data_file)
    | ...  # fix up csv lines
    | 'ToRecordBatches' >> csv_tfxio.BeamSource())

tensor_adapter_config = csv_tfxio.TensorAdapterConfig()

โปรดทราบว่าเราต้องทำการแก้ไขเพิ่มเติมหลังจากอ่านบรรทัด CSV แล้วมิฉะนั้นเราสามารถพึ่งพา CsvTFXIO เพื่อจัดการทั้งการอ่านไฟล์และการแปลเป็น RecordBatch es:

csv_tfxio = tfxio.CsvTFXIO(train_data_file, column_name=ordered_columns,
                           schema=SCHEMA)
record_batches = p | 'TFXIORead' >> csv_tfxio.BeamSource()
tensor_adapter_config = csv_tfxio.TensorAdapterConfig()

การประมวลผลล่วงหน้าคล้ายกับตัวอย่างก่อนหน้านี้ยกเว้นฟังก์ชันก่อนการประมวลผลจะถูกสร้างขึ้นโดยโปรแกรมแทนที่จะระบุแต่ละคอลัมน์ด้วยตนเอง ในฟังก์ชันก่อนการประมวลผลด้านล่าง NUMERICAL_COLUMNS และ CATEGORICAL_COLUMNS เป็นรายการที่มีชื่อของคอลัมน์ตัวเลขและหมวดหมู่:

def preprocessing_fn(inputs):
  """Preprocess input columns into transformed columns."""
  # Since we are modifying some features and leaving others unchanged, we
  # start by setting `outputs` to a copy of `inputs.
  outputs = inputs.copy()

  # Scale numeric columns to have range [0, 1].
  for key in NUMERIC_FEATURE_KEYS:
    outputs[key] = tft.scale_to_0_1(outputs[key])

  for key in OPTIONAL_NUMERIC_FEATURE_KEYS:
    # This is a SparseTensor because it is optional. Here we fill in a default
    # value when it is missing.
      sparse = tf.sparse.SparseTensor(outputs[key].indices, outputs[key].values,
                                      [outputs[key].dense_shape[0], 1])
      dense = tf.sparse.to_dense(sp_input=sparse, default_value=0.)
    # Reshaping from a batch of vectors of size 1 to a batch to scalars.
    dense = tf.squeeze(dense, axis=1)
    outputs[key] = tft.scale_to_0_1(dense)

  # For all categorical columns except the label column, we generate a
  # vocabulary but do not modify the feature.  This vocabulary is instead
  # used in the trainer, by means of a feature column, to convert the feature
  # from a string to an integer id.
  for key in CATEGORICAL_FEATURE_KEYS:
    tft.vocabulary(inputs[key], vocab_filename=key)

  # For the label column we provide the mapping from string to index.
  initializer = tf.lookup.KeyValueTensorInitializer(
      keys=['>50K', '<=50K'],
      values=tf.cast(tf.range(2), tf.int64),
      key_dtype=tf.string,
      value_dtype=tf.int64)
  table = tf.lookup.StaticHashTable(initializer, default_value=-1)

  outputs[LABEL_KEY] = table.lookup(outputs[LABEL_KEY])

  return outputs

ความแตกต่างอย่างหนึ่งจากตัวอย่างก่อนหน้านี้คือคอลัมน์เลเบลระบุการแมปจากสตริงไปยังดัชนีด้วยตนเอง ดังนั้น '>50' จึงถูกจับคู่กับ 0 และ '<=50K' ถูกจับคู่กับ 1 เนื่องจากมีประโยชน์ในการทราบว่าดัชนีใดในแบบจำลองที่ได้รับการฝึกอบรมตรงกับป้ายกำกับใด

ตัวแปร record_batches แสดงถึง PCollection ของ pyarrow.RecordBatch es tensor_adapter_config กำหนดโดย csv_tfxio ซึ่งอนุมานจาก SCHEMA (และท้ายที่สุดในตัวอย่างนี้จากข้อกำหนดการแยกวิเคราะห์ TF)

ขั้นตอนสุดท้ายคือการเขียนข้อมูลที่แปลงแล้วลงในดิสก์และมีรูปแบบคล้ายกับการอ่านข้อมูลดิบ สคีมาที่ใช้ในการทำสิ่งนี้เป็นส่วนหนึ่งของเอาต์พุตของ AnalyzeAndTransformDataset ซึ่งอนุมาน schema สำหรับข้อมูลเอาต์พุต โค้ดที่จะเขียนลงดิสก์แสดงอยู่ด้านล่าง สคีมาเป็นส่วนหนึ่งของข้อมูลเมตา แต่ใช้ทั้งสองอย่างสลับกันใน tf.Transform API (เช่นส่งข้อมูลเมตาไปยัง ExampleProtoCoder ) โปรดทราบว่าสิ่งนี้เขียนในรูปแบบอื่น แทนที่จะใช้ textio.WriteToText ให้ใช้การสนับสนุนในตัวของ Beam สำหรับรูปแบบ TFRecord และใช้ coder เพื่อเข้ารหัสข้อมูลเป็น Example protos นี่เป็นรูปแบบที่ดีกว่าที่จะใช้สำหรับการฝึกอบรมดังแสดงในหัวข้อถัดไป transformed_eval_data_base จัดเตรียมชื่อไฟล์พื้นฐานสำหรับแต่ละชาร์ดที่เขียน

transformed_data | "WriteTrainData" >> tfrecordio.WriteToTFRecord(
    transformed_eval_data_base,
    coder=tft.coders.ExampleProtoCoder(transformed_metadata))

นอกเหนือจากข้อมูลการฝึกอบรมแล้ว transform_fn ยังเขียนด้วยข้อมูลเมตา:

_ = (
    transform_fn
    | 'WriteTransformFn' >> tft_beam.WriteTransformFn(working_dir))
transformed_metadata | 'WriteMetadata' >> tft_beam.WriteMetadata(
    transformed_metadata_file, pipeline=p)

รัน Beam pipeline ทั้งหมดด้วย p.run().wait_until_finish() จนถึงจุดนี้ไปป์ไลน์ Beam แสดงถึงการคำนวณแบบกระจายที่รอการตัดบัญชี มีคำแนะนำสำหรับสิ่งที่จะทำ แต่คำแนะนำยังไม่ถูกดำเนินการ การเรียกครั้งสุดท้ายนี้ดำเนินการไปป์ไลน์ที่ระบุ

ดาวน์โหลดชุดข้อมูลสำมะโน

ดาวน์โหลดชุดข้อมูลสำมะโนประชากรโดยใช้คำสั่งเชลล์ต่อไปนี้:

  wget https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.data
  wget https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.test

เมื่อรันสคริปต์ census_example.py ให้ส่งผ่านไดเร็กทอรีที่มีข้อมูลนี้เป็นอาร์กิวเมนต์แรก สคริปต์สร้างไดเร็กทอรีย่อยชั่วคราวเพื่อเพิ่มข้อมูลที่ประมวลผลล่วงหน้า

ผสานรวมกับการฝึกอบรม TensorFlow

ส่วนสุดท้ายของ census_example.py แสดงวิธีใช้ข้อมูลที่ประมวลผลล่วงหน้าเพื่อฝึกโมเดล ดูรายละเอียดใน เอกสารเครื่องมือประมาณการ ขั้นตอนแรกคือการสร้าง Estimator ซึ่งต้องมีคำอธิบายของคอลัมน์ที่ประมวลผลล่วงหน้า คอลัมน์ตัวเลขแต่ละคอลัมน์อธิบายว่าเป็น real_valued_column ซึ่งเป็นกระดาษห่อหุ้มรอบเวกเตอร์หนาแน่นที่มีขนาดคงที่ ( 1 ในตัวอย่างนี้) แต่ละคอลัมน์ที่จัดหมวดหมู่ถูกจับคู่จากสตริงเป็นจำนวนเต็มจากนั้นจะถูกส่งผ่านไปยัง indicator_column tft.TFTransformOutput ใช้เพื่อค้นหาเส้นทางของไฟล์คำศัพท์สำหรับแต่ละคุณลักษณะที่เป็นหมวดหมู่

real_valued_columns = [feature_column.real_valued_column(key)
                       for key in NUMERIC_FEATURE_KEYS]

one_hot_columns = [
    tf.feature_column.indicator_column(
        tf.feature_column.categorical_column_with_vocabulary_file(
            key=key,
            vocabulary_file=tf_transform_output.vocabulary_file_by_name(
                vocab_filename=key)))
    for key in CATEGORICAL_FEATURE_KEYS]

estimator = tf.estimator.LinearClassifier(real_valued_columns + one_hot_columns)

ขั้นตอนต่อไปคือการสร้างตัวสร้างเพื่อสร้างฟังก์ชันอินพุตสำหรับการฝึกอบรมและการประเมินผล ความแตกต่างจากการฝึกอบรมที่ใช้โดย tf.Learn เนื่องจากไม่จำเป็นต้องใช้ข้อมูลจำเพาะคุณลักษณะเพื่อแยกวิเคราะห์ข้อมูลที่แปลงแล้ว ให้ใช้ข้อมูลเมตาสำหรับข้อมูลที่แปลงแล้วเพื่อสร้างคุณลักษณะเฉพาะ

def _make_training_input_fn(tf_transform_output, transformed_examples,
                            batch_size):
  ...
  def input_fn():
    """Input function for training and eval."""
    dataset = tf.data.experimental.make_batched_features_dataset(
        ..., tf_transform_output.transformed_feature_spec(), ...)

    transformed_features = tf.compat.v1.data.make_one_shot_iterator(
        dataset).get_next()
    ...

  return input_fn

โค้ดที่เหลือจะเหมือนกับการใช้คลาส Estimator ตัวอย่างนี้ยังมีโค้ดสำหรับเอ็กซ์พอร์ตโมเดลในรูปแบบ SavedModel Tensorflow Serving หรือ Cloud ML Engine สามารถใช้โมเดลที่ส่งออกได้