הצטרף לקהילת SIG TFX-Addons ועזור לשפר את TFX!

התחל בעבודה עם TensorFlow Transform

מדריך זה מציג את המושגים הבסיסיים של tf.Transform וכיצד להשתמש בהם. זה יהיה:

  • הגדר פונקציית עיבוד מקדים , תיאור לוגי של הצינור ההופך את הנתונים הגולמיים לנתונים המשמשים להכשרת מודל למידת מכונה.
  • הראה את יישום ה- Apache Beam המשמש לשינוי נתונים על ידי המרת פונקציית העיבוד המקדים לצינור Beam .
  • הראה דוגמאות לשימוש נוספות.

הגדר פונקציית עיבוד מקדים

פונקציית tf.Transform המקדים היא המושג החשוב ביותר של tf.Transform . פונקציית העיבוד המקדים היא תיאור הגיוני של טרנספורמציה של מערך הנתונים. פונקציית עיבוד מקדים מקבלת ומחזירה מילון של טנזורים, שבו מותח אומר Tensor או SparseTensor . ישנם שני סוגים של פונקציות המשמשות להגדרת פונקציית העיבוד המקדים:

  1. כל פונקציה שמקבלת ומחזירה טנזורים. אלה מוסיפים גרף פעולות TensorFlow שהופכות נתונים גולמיים לנתונים שהופכו.
  2. כל אחד המנתחים שמספקים tf.Transform . מנתחים מקבלים ומחזירים גם טנזורים, אך בניגוד לפונקציות TensorFlow, הם לא מוסיפים פעולות לגרף. במקום זאת, מנתחים גורמים ל- tf.Transform לחשב פעולת מעבר מלא מחוץ ל- TensorFlow. הם משתמשים בערכי טנסור הקלט על פני מערך הנתונים כולו כדי ליצור טנסור קבוע המוחזר כפלט. לדוגמה, tft.min מחשבת מינימום של טנסור על פני מערך הנתונים. tf.Transform מספק קבוצה מנתחים קבועה, אך זו תורחב בגרסאות עתידיות.

דוגמה לפונקציה לעיבוד מקדים

על ידי שילוב של מנתחים ופונקציות רגילות של TensorFlow, משתמשים יכולים ליצור צינורות גמישים לשינוי נתונים. הפונקציה הבאה לעיבוד מקדים הופכת כל אחת משלוש התכונות בדרכים שונות ומשלבת שתיים מהתכונות:

import tensorflow as tf
import tensorflow_transform as tft
import tensorflow_transform.beam as tft_beam

def preprocessing_fn(inputs):
  x = inputs['x']
  y = inputs['y']
  s = inputs['s']
  x_centered = x - tft.mean(x)
  y_normalized = tft.scale_to_0_1(y)
  s_integerized = tft.compute_and_apply_vocabulary(s)
  x_centered_times_y_normalized = x_centered * y_normalized
  return {
      'x_centered': x_centered,
      'y_normalized': y_normalized,
      'x_centered_times_y_normalized': x_centered_times_y_normalized,
      's_integerized': s_integerized
  }

כאן, x , y ו- s הם Tensor s המייצגים תכונות קלט. הטנסור החדש הראשון שנוצר, x_centered , נבנה על ידי החלת tft.mean ל- x tft.mean מ- x . tft.mean(x) מחזיר טנסור המייצג את ממוצע הטנסור x . x_centered הוא טנסור x עם חיסור הממוצע.

הטנסור החדש השני, y_normalized , נוצר באופן דומה אך בשיטת הנוחות tft.scale_to_0_1 . שיטה זו עושה משהו הדומה למחשוב x_centered , כלומר חישוב מקסימום ומינימום ושימוש בהם כדי לשנות את קנה המידה של y .

טנסור s_integerized מראה דוגמה למניפולציה בחוטים. במקרה זה, אנו לוקחים מחרוזת וממפים אותה למספר שלם. זה משתמש בפונקציית הנוחותtft.compute_and_apply_vocabulary . פונקציה זו משתמשת במנתח כדי לחשב את הערכים הייחודיים שנלקחו על ידי מחרוזות הקלט, ואז משתמשת בפעולות TensorFlow כדי להמיר את מחרוזות הקלט למדדים בטבלת הערכים הייחודיים.

העמודה הסופית מראה כי ניתן להשתמש בפעולות TensorFlow כדי ליצור תכונות חדשות על ידי שילוב טנזורים.

פונקציית העיבוד המקדים מגדירה צינור פעולות במערך נתונים. על מנת ליישם את הצינור, אנו מסתמכים על יישום קונקרטי של ממשק ה- API של tf.Transform . יישום ה- Apache Beam מספק PTransform המחיל את פונקציית PTransform המוקדם של המשתמש על נתונים. זרימת העבודה האופיינית של משתמש tf.Transform תבנה פונקציית עיבוד מקדים, tf.Transform זאת בצינור Beam גדול יותר, ותיצור את הנתונים לאימון.

אצווה

אצווה היא חלק חשוב מ- TensorFlow. מכיוון שאחת המטרות של tf.Transform היא לספק גרף TensorFlow לעיבוד מקדים שניתן לשלב בתרשים ההגשה (ואופציה, גרף האימונים), אצווה היא גם מושג חשוב ב- tf.Transform .

אמנם לא מובן מאליו בדוגמה שלמעלה, אך פונקציית העיבוד המקדים המוגדרת על ידי המשתמש עוברת על ידי טנזורים המייצגים קבוצות ולא מקרים בודדים, כפי שקורה במהלך אימון ושירות עם TensorFlow. מצד שני, מנתחים מבצעים חישוב על פני כל מערך הנתונים המחזיר ערך יחיד ולא אצווה של ערכים. x הוא Tensor עם צורה של (batch_size,) , ואילו tft.mean(x) הוא Tensor עם צורה של () . החיסור x - tft.mean(x) משדר כאשר הערך של tft.mean(x) מופחת מכל רכיב אצווה המיוצג על ידי x .

יישום קרן אפאצ'י

בעוד שפונקציית העיבוד המקדים מיועדת כתיאור לוגי של צינור עיבוד מקדים המיושם במסגרות עיבוד נתונים מרובות, tf.Transform מספק יישום קנוני המשמש ב- Apache Beam. יישום זה מדגים את הפונקציונליות הנדרשת מיישום. אין ממשק API רשמי לפונקציונליות זו, ולכן כל יישום יכול להשתמש ב- API שהוא אידיומטי למסגרת עיבוד הנתונים הספציפית שלו.

יישום ה- Apache Beam מספק שני PTransform המשמשים לעיבוד נתונים עבור פונקציית עיבוד מקדים. להלן הצגת השימוש עבור PTransform AnalyzeAndTransformDataset :

raw_data = [
    {'x': 1, 'y': 1, 's': 'hello'},
    {'x': 2, 'y': 2, 's': 'world'},
    {'x': 3, 'y': 3, 's': 'hello'}
]

raw_data_metadata = ...
transformed_dataset, transform_fn = (
    (raw_data, raw_data_metadata) | tft_beam.AnalyzeAndTransformDataset(
        preprocessing_fn))
transformed_data, transformed_metadata = transformed_dataset

התוכן transformed_data מוצג למטה ומכיל את העמודות שהופכו באותה פורמט של הנתונים הגולמיים. בפרט, הערכים של s_integerized הם [0, 1, 0] - ערכים אלה תלויים באופן שבו מילות המילה hello world מומו למספרים שלמים, וזה דטרמיניסטי. בעמודה x_centered , הורדנו את הממוצע כך שערכי העמודה x , שהיו [1.0, 2.0, 3.0] , הפכו ל- [-1.0, 0.0, 1.0] . באופן דומה, שאר העמודות תואמות לערכים הצפויים שלהם.

[{u's_integerized': 0,
  u'x_centered': -1.0,
  u'x_centered_times_y_normalized': -0.0,
  u'y_normalized': 0.0},
 {u's_integerized': 1,
  u'x_centered': 0.0,
  u'x_centered_times_y_normalized': 0.0,
  u'y_normalized': 0.5},
 {u's_integerized': 0,
  u'x_centered': 1.0,
  u'x_centered_times_y_normalized': 1.0,
  u'y_normalized': 1.0}]

הן raw_data והן transformed_data הם מערכי נתונים. שני הסעיפים הבאים מראים כיצד יישום ה- Beam מייצג מערכי נתונים וכיצד לקרוא ולכתוב נתונים לדיסק. ערך ההחזר האחר, transform_fn , מייצג את השינוי המיושם על הנתונים, המכוסה בפירוט להלן.

AnalyzeAndTransformDataset הוא ההרכב של שני הטרנספורמציות הבסיסיות הניתנות על ידי היישום AnalyzeDataset ו- TransformDataset . אז שני קטעי הקוד הבאים שווים:

transformed_data, transform_fn = (
    my_data | tft_beam.AnalyzeAndTransformDataset(preprocessing_fn))
transform_fn = my_data | tft_beam.AnalyzeDataset(preprocessing_fn)
transformed_data = (my_data, transform_fn) | tft_beam.TransformDataset()

transform_fn היא פונקציה טהורה המייצגת פעולה המוחלת על כל שורה במערך הנתונים. בפרט, ערכי המנתח כבר מחושבים ומטופלים כקבועים. בדוגמה, ה- transform_fn מכיל כקבועים את הממוצע של העמודה x , את המינימום והמקסימום של העמודה y ואת אוצר המילים המשמש למיפוי המיתרים למספרים שלמים.

תכונה חשובה של tf.Transform היא כי transform_fn מייצג מפה על פני שורות - זו פונקציה טהורה המופעלת על כל שורה בנפרד. כל החישובים לצבירת שורות נעשים ב- AnalyzeDataset . יתר על כן, transform_fn מיוצג TensorFlow Graph שיכול להיות מוטבע לתוך גרף ההגשה.

AnalyzeAndTransformDataset מסופק לצורך אופטימיזציות במקרה מיוחד זה. זוהי אותה תבנית בשימוש scikit-ללמוד , מתן fit , transform , ו fit_transform שיטות.

תבניות נתונים וסכמה

יישום TFT Beam מקבל שני פורמטים שונים של נתוני קלט. הפורמט "dict dict" (כפי שנראה בדוגמה לעיל וב- simple_example.py ) הוא פורמט אינטואיטיבי ומתאים לערכות נתונים קטנות ואילו הפורמט TFXIO ( Apache Arrow ) מספק ביצועים משופרים ומתאים למערכי נתונים גדולים.

יישום ה- Beam מספר באיזה פורמט ה- PCollection קלט יהיה על ידי "המטא-נתונים" הנלווים ל- PCollection:

(raw_data, raw_data_metadata) | tft.AnalyzeDataset(...)
  • אם raw_data_metadata הוא dataset_metadata.DatasetMetadata (ראה להלן, סעיף "פורמט 'dict dict'"), אז raw_data צפויה להיות בתבנית "dict dict".
  • אם raw_data_metadata הוא tfxio.TensorAdapterConfig (ראה להלן, החלק "פורמט TFXIO"), אז raw_data צפויה להיות בפורמט TFXIO.

הפורמט "dict dict"

בדוגמאות הקוד הקודמות, הקוד שמגדיר raw_data_metadata מושמט. המטא נתונים מכילים את הסכימה המגדירה את פריסת הנתונים כך שהם נקראים ונכתבים בפורמטים שונים. אפילו הפורמט בזיכרון המוצג בחלק האחרון אינו מתאר את עצמו ומחייב את הסכימה על מנת להתפרש כטנזורים.

להלן ההגדרה של הסכימה לנתוני הדוגמה:

from tensorflow_transform.tf_metadata import dataset_metadata
from tensorflow_transform.tf_metadata import schema_utils

raw_data_metadata = dataset_metadata.DatasetMetadata(
      schema_utils.schema_from_feature_spec({
        's': tf.io.FixedLenFeature([], tf.string),
        'y': tf.io.FixedLenFeature([], tf.float32),
        'x': tf.io.FixedLenFeature([], tf.float32),
    }))

פרוטו Schema מכיל את המידע הדרוש לניתוח הנתונים מהפורמט שעל הדיסק או מהזיכרון, לטנורים. זה נבנה בדרך כלל על ידי קריאה ל- schema_utils.schema_from_feature_spec עם מקשי תכונות למיפוי tf.io.FixedLenFeature tf.io.VarLenFeature tf.io.FixedLenFeature , tf.io.VarLenFeature ו- tf.io.SparseFeature . עיין בתיעוד עבור tf.parse_example לפרטים נוספים.

למעלה אנו משתמשים ב- tf.io.FixedLenFeature כדי לציין שכל תכונה מכילה מספר קבוע של ערכים, במקרה זה ערך סקלרי יחיד. מכיוון tf.Transform מקרים אצוות, הממשי Tensor המייצג את התכונה תהיה צורה (None,) כאשר המאפיין ידוע הוא הממד אצווה.

פורמט TFXIO

עם פורמט זה, הנתונים צפויים להיכלל ב pyarrow.RecordBatch . לנתונים בטבלאות, יישום ה- Apache Beam שלנו מקבל את החצים של RecordBatch המורכבים מעמודות מהסוגים הבאים:

  • pa.list_(<primitive>) , כאשר <primitive> הוא pa.int64() , pa.float32() pa.binary() או pa.large_binary() .

  • pa.large_list(<primitive>)

מערך קלט הצעצועים בו השתמשנו לעיל, כשהוא מיוצג כ- RecordBatch , נראה כך:

raw_data = [
    pa.record_batch([
        pa.array([[1], [2], [3]], pa.list_(pa.float32())),
        pa.array([[1], [2], [3]], pa.list_(pa.float32())),
        pa.array([['hello'], ['world'], ['hello']], pa.list_(pa.binary())),
    ], ['x', 'y', 's'])
]

בדומה ל- DatasetMetadata הדרוש לליווי הפורמט "dict dict", יש צורך ב- tfxio.TensorAdapterConfig כדי ללוות את ה- RecordBatch . הוא מורכב RecordBatch החץ של ה- RecordBatch es ו- TensorRepresentations כדי לקבוע באופן ייחודי כיצד ניתן לפרש עמודות ב- RecordBatch es כ- TensorFlow Tensors (כולל אך לא מוגבל ל- tf. Tensor, tf.SparseTensor).

TensorRepresentations הוא Dict[Text, TensorRepresentation] את הקשר בין RecordBatch שמקבל preprocessing_fn לבין עמודות ב- RecordBatch es. לדוגמה:

tensor_representation = {
    'x': text_format.Parse(
        """dense_tensor { column_name: "col1" shape { dim { size: 2 } } }"""
        schema_pb2.TensorRepresentation())
}

פירושו שהקלטים inputs['x'] ב- preprocessing_fn צריכים להיות tf.Tensor צפוף, שערכיו מגיעים מעמודה של השם 'col1' בקלט ה- RecordBatch קלט, וצורתו [batch_size, 2] ) צריכה להיות [batch_size, 2] .

TensorRepresentation הוא Protobuf המוגדר במטא נתונים של TensorFlow .

תאימות עם TensorFlow

tf.Transform מספק תמיכה בייצוא ה- transform_fn לעיל בתור TF 1.x או TF 2.x SavedModel. התנהגות ברירת המחדל לפני המהדורה 0.30 ייצאה TF 1.x SavedModel. החל משחרור 0.30 , התנהגות ברירת המחדל היא ייצוא TF 2.x SavedModel אלא אם כן התנהגויות TF 2.x אינן מושבתות במפורש (על ידי קריאה ל- tf.compat.v1.disable_v2_behavior() למשל).

אם אתה משתמש במושגי TF 1.x כגון Estimators and Sessions , אתה יכול לשמור על ההתנהגות הקודמת על ידי העברת force_tf_compat_v1=True ל- tft_beam.Context אם משתמשים ב- tf.Transform כספרייה עצמאית או לרכיב ה- Transform ב- TFX.

בעת ייצוא ה- transform_fn כ- TF 2.x SavedModel, ניתן לעקוב אחר ה- preprocessing_fn באמצעות tf.function . בנוסף, אם אתה מריץ את הצינור שלך מרחוק (למשל עם DataflowRunner ), ודא ש- preprocessing_fn וכל התלות ארוזים כראוי כמתואר כאן .

בעיות ידועות בשימוש tf.Transform לייצוא TF 2.x SavedModel מתועדות כאן .

קלט ופלט באמצעות Apache Beam

עד כה ראינו נתוני קלט ופלט ברשימות פיתון (של RecordBatch es או מילוני מופע). זהו פשט שמסתמך על יכולתו של Apache Beam לעבוד עם רשימות כמו גם על ייצוג הנתונים העיקרי שלה, ה- PCollection .

PCollection הוא ייצוג נתונים המהווה חלק מצינור Beam. צינור Beam נוצר על ידי יישום של PTransform שונים, כולל AnalyzeDataset ו- TransformDataset , והפעלת הצינור. זיכרון PCollection אינו נוצר בזיכרון הבינארי הראשי, אלא מופץ בין העובדים (אם כי סעיף זה משתמש במצב הביצוע בזיכרון).

טרום משומר PCollection מקורות ( TFXIO )

פורמט RecordBatch שמקבל היישום שלנו הוא פורמט נפוץ שמקבלים ספריות TFX אחרות. לכן TFX מציעה "מקורות" נוחים (aka TFXIO ) שקוראים קבצים בפורמטים שונים בדיסק ומייצרים RecordBatch ויכולים גם לתת TensorAdapterConfig , כולל TensorRepresentations שהוסקו.

את אותם TFXIO אפשר למצוא בחבילה tfx_bsl ( tfx_bsl.public.tfxio ).

דוגמה: מערך הנתונים "Census Income"

הדוגמה הבאה דורש הן קריאה וכתיבה הנתונים בדיסק וייצוג נתונים כמו PCollection (לא ברשימה), ראה: census_example.py . להלן אנו מראים כיצד ניתן להוריד את הנתונים ולהפעיל דוגמה זו. מערך הנתונים "Census Income" מסופק על ידי UCI Machine Learning Repository . מערך נתונים זה מכיל נתונים קטגוריים ונתונים מספריים.

הנתונים הם בפורמט CSV, להלן שתי השורות הראשונות:

39, State-gov, 77516, Bachelors, 13, Never-married, Adm-clerical, Not-in-family, White, Male, 2174, 0, 40, United-States, <=50K
50, Self-emp-not-inc, 83311, Bachelors, 13, Married-civ-spouse, Exec-managerial, Husband, White, Male, 0, 0, 13, United-States, <=50K

העמודות של מערך הנתונים הן קטגוריות או מספריות. מערך נתונים זה מתאר בעיית סיווג: חיזוי העמודה האחרונה בה האדם מרוויח יותר או פחות מ- 50 אלף בשנה. עם זאת, מנקודת המבט של tf.Transform , תווית זו היא רק עוד טור קטגורי.

אנו משתמשים ב- TFXIO משומר TFXIO , BeamRecordCsvTFXIO כדי לתרגם את קווי ה- CSV ל- RecordBatches . TFXIO דורש שני מידע חשוב:

  • סכימת מטא נתונים של TensorFlow המכילה מידע על סוג וצורה על כל עמודת CSV. TensorRepresentation s הם חלק אופציונלי TensorRepresentation ; אם לא מסופק (וזה המקרה בדוגמה זו), הם יוסקו ממידע הסוג והצורה. ניתן להשיג את הסכימה באמצעות פונקציית עוזר שאנו מספקים לתרגום ממפרט ניתוח TF (מוצג בדוגמה זו), או על ידי הפעלת אימות נתונים של TensorFlow .

  • רשימה של שמות עמודות, לפי סדר הופעתן בקובץ ה- CSV. שים לב ששמות אלה חייבים להתאים לשמות התכונות בסכמה.

בדוגמה זו אנו מאפשרים education-num בתכונה education-num . אמצעי זה כי זה מיוצג tf.io.VarLenFeature ב feature_spec, וכתוצאה tf.SparseTensor ב preprocessing_fn . תכונות אחרות יהפכו tf.Tensor s עם אותו שם ב- preprocessing_fn .

csv_tfxio = tfxio.BeamRecordCsvTFXIO(
    physical_format='text', column_names=ordered_columns, schema=SCHEMA)

record_batches = (
    p
    | 'ReadTrainData' >> textio.ReadFromText(train_data_file)
    | ...  # fix up csv lines
    | 'ToRecordBatches' >> csv_tfxio.BeamSource())

tensor_adapter_config = csv_tfxio.TensorAdapterConfig()

שים לב שהיינו חייבים לבצע תיקונים נוספים לאחר קריאת שורות ה- CSV. אחרת נוכל להסתמך על ה- CsvTFXIO לטיפול בקריאת הקבצים ובתרגום ל- RecordBatch :

csv_tfxio = tfxio.CsvTFXIO(train_data_file, column_name=ordered_columns,
                           schema=SCHEMA)
record_batches = p | 'TFXIORead' >> csv_tfxio.BeamSource()
tensor_adapter_config = csv_tfxio.TensorAdapterConfig()

עיבוד מקדים דומה לדוגמא הקודמת, למעט פונקציית העיבוד המקדים נוצרת באופן פרוגרמטי במקום לציין ידנית כל עמודה. בפונקציית העיבוד המקדים למטה, NUMERICAL_COLUMNS ו- CATEGORICAL_COLUMNS הן רשימות המכילות את שמות העמודות המספריות והקטגוריות:

def preprocessing_fn(inputs):
  """Preprocess input columns into transformed columns."""
  # Since we are modifying some features and leaving others unchanged, we
  # start by setting `outputs` to a copy of `inputs.
  outputs = inputs.copy()

  # Scale numeric columns to have range [0, 1].
  for key in NUMERIC_FEATURE_KEYS:
    outputs[key] = tft.scale_to_0_1(outputs[key])

  for key in OPTIONAL_NUMERIC_FEATURE_KEYS:
    # This is a SparseTensor because it is optional. Here we fill in a default
    # value when it is missing.
      sparse = tf.sparse.SparseTensor(outputs[key].indices, outputs[key].values,
                                      [outputs[key].dense_shape[0], 1])
      dense = tf.sparse.to_dense(sp_input=sparse, default_value=0.)
    # Reshaping from a batch of vectors of size 1 to a batch to scalars.
    dense = tf.squeeze(dense, axis=1)
    outputs[key] = tft.scale_to_0_1(dense)

  # For all categorical columns except the label column, we generate a
  # vocabulary but do not modify the feature.  This vocabulary is instead
  # used in the trainer, by means of a feature column, to convert the feature
  # from a string to an integer id.
  for key in CATEGORICAL_FEATURE_KEYS:
    tft.vocabulary(inputs[key], vocab_filename=key)

  # For the label column we provide the mapping from string to index.
  initializer = tf.lookup.KeyValueTensorInitializer(
      keys=['>50K', '<=50K'],
      values=tf.cast(tf.range(2), tf.int64),
      key_dtype=tf.string,
      value_dtype=tf.int64)
  table = tf.lookup.StaticHashTable(initializer, default_value=-1)

  outputs[LABEL_KEY] = table.lookup(outputs[LABEL_KEY])

  return outputs

הבדל אחד מהדוגמה הקודמת הוא עמודת התווית המציינת ידנית את המיפוי מהמחרוזת לאינדקס. אז '>50' ממופה ל 0 ו- '<=50K' ממופה ל 1 מכיוון שכדאי לדעת איזה אינדקס במודל המאומן תואם לאיזה תווית.

record_batches משתנה מייצג PCollection של pyarrow.RecordBatch es. tensor_adapter_config ניתן על ידי csv_tfxio , csv_tfxio מ- SCHEMA (ובסופו של דבר, בדוגמה זו, ממפרט הניתוח של TF).

השלב האחרון הוא לכתוב את הנתונים שהופכו לדיסק ויש לו צורה דומה לקריאת הנתונים הגולמיים. הסכימה המשמשת לשם כך היא חלק מהפלט של AnalyzeAndTransformDataset המציג סכמה עבור נתוני הפלט. הקוד לכתיבה לדיסק מוצג להלן. הסכימה היא חלק מהמטא-נתונים אך משתמשת בשניים להחלפה בממשק ה- API של tf.Transform (כלומר מעבירים את המטא-נתונים ל- ExampleProtoCoder ). שים לב שהדבר כותב במתכונת אחרת. במקום textio.WriteToText , השתמש בתמיכה המובנית של Beam בתבנית TFRecord כדי לקודד את הנתונים כ- protos Example . זהו פורמט טוב יותר לשימוש לאימונים, כפי שמוצג בסעיף הבא. transformed_eval_data_base מספק את שם הקובץ הבסיסי לרסיסים בודדים שנכתבים.

transformed_data | "WriteTrainData" >> tfrecordio.WriteToTFRecord(
    transformed_eval_data_base,
    coder=tft.coders.ExampleProtoCoder(transformed_metadata))

בנוסף לנתוני האימון, transform_fn נכתב גם עם המטא נתונים:

_ = (
    transform_fn
    | 'WriteTransformFn' >> tft_beam.WriteTransformFn(working_dir))
transformed_metadata | 'WriteMetadata' >> tft_beam.WriteMetadata(
    transformed_metadata_file, pipeline=p)

הפעל את כל צינור ה- Beam עם p.run().wait_until_finish() . עד לנקודה זו, צינור ה- Beam מייצג חישוב מבוזר ונדחה. הוא מספק הוראות לנעשה, אך ההוראות לא בוצעו. שיחה סופית זו מבצעת את הצינור שצוין.

הורד את מערך המפקד

הורד את מערך המפקד באמצעות פקודות הפגז הבאות:

  wget https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.data
  wget https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.test

בעת הפעלת סקריפט census_example.py , העבירו את הספריה המכילה נתונים אלה כארגומנט הראשון. הסקריפט יוצר ספריה משנה זמנית להוספת הנתונים המעובדים מראש.

השתלב עם אימון TensorFlow

החלק האחרון של census_example.py מראה כיצד משתמשים בנתונים census_example.py מראש להכשרת מודל. לפרטים, עיין בתיעוד האומדנים . השלב הראשון הוא בניית Estimator שמצריך תיאור של העמודות שעובדו מראש. כל עמודה מספרית מתוארת real_valued_column שהיא עטיפה סביב וקטור צפוף בגודל קבוע ( 1 בדוגמה זו). כל עמודה קטגורית ממופה ממחרוזת למספרים שלמים ואז מועברת לעמודה indicator_column . tft.TFTransformOutput משמש למציאת נתיב קובץ אוצר המילים עבור כל תכונה קטגורית.

real_valued_columns = [feature_column.real_valued_column(key)
                       for key in NUMERIC_FEATURE_KEYS]

one_hot_columns = [
    tf.feature_column.indicator_column(
        tf.feature_column.categorical_column_with_vocabulary_file(
            key=key,
            vocabulary_file=tf_transform_output.vocabulary_file_by_name(
                vocab_filename=key)))
    for key in CATEGORICAL_FEATURE_KEYS]

estimator = tf.estimator.LinearClassifier(real_valued_columns + one_hot_columns)

השלב הבא הוא ליצור בונה ליצירת פונקציית הקלט להכשרה והערכה. שונה tf.Learn בה השתמשו tf.Learn מכיוון tf.Learn תכונה אינו נדרש tf.Learn . במקום זאת, השתמש במטא-נתונים עבור הנתונים שהשתנו כדי ליצור מפרט תכונות.

def _make_training_input_fn(tf_transform_output, transformed_examples,
                            batch_size):
  ...
  def input_fn():
    """Input function for training and eval."""
    dataset = tf.data.experimental.make_batched_features_dataset(
        ..., tf_transform_output.transformed_feature_spec(), ...)

    transformed_features = tf.compat.v1.data.make_one_shot_iterator(
        dataset).get_next()
    ...

  return input_fn

הקוד שנותר זהה לשימוש במחלקת Estimator . הדוגמה מכילה גם קוד לייצוא המודל בפורמט SavedModel . המודל המיוצא יכול לשמש את Tensorflow Serving או את מנוע ה- ML ML .