מדריך זה מתאר כיצד להשתמש ב-TFX API כדי לבנות רכיב מותאם אישית מלא. רכיבים מותאמים אישית לחלוטין מאפשרים לך לבנות רכיבים על ידי הגדרת מפרט הרכיבים, המבצעים ומחלקות ממשק הרכיבים. גישה זו מאפשרת לך לעשות שימוש חוזר ולהרחיב רכיב סטנדרטי שיתאים לצרכים שלך.
אם אתה חדש בצינורות TFX, למד עוד על מושגי הליבה של צינורות TFX .
מבצע מותאם אישית או רכיב מותאם אישית
אם יש צורך רק בלוגיקת עיבוד מותאמת אישית בעוד שמאפייני הקלט, הפלטים ומאפייני הביצוע של הרכיב זהים לרכיב קיים, די בביצוע מותאם אישית. יש צורך ברכיב מותאם אישית מלא כאשר כל אחד מהכניסות, הפלטים או מאפייני הביצוע שונים מכל רכיבי TFX קיימים.
איך יוצרים רכיב מותאם אישית?
פיתוח רכיב מותאם אישית מלא דורש:
- קבוצה מוגדרת של מפרטי קלט ופלט עבור הרכיב החדש. במיוחד, הסוגים של חפצי הקלט צריכים להיות עקביים עם סוגי חפצי הפלט של הרכיבים המייצרים את החפצים והסוגים עבור חפצי הפלט צריכים להיות עקביים עם סוגי חפצי הקלט של הרכיבים שצורכים את החפצים אם קיימים.
- פרמטרי הביצוע שאינם חפצים הדרושים עבור הרכיב החדש.
מפרט רכיבים
המחלקה ComponentSpec
מגדירה את חוזה הרכיב על ידי הגדרת חפצי הקלט והפלט לרכיב, כמו גם את הפרמטרים המשמשים לביצוע הרכיב. יש לו שלושה חלקים:
- INPUTS : מילון של פרמטרים מוקלדים עבור חפצי הקלט המועברים למבצע הרכיבים. בדרך כלל חפצי קלט הם הפלטים מרכיבים במעלה הזרם ולכן חולקים את אותו סוג.
- OUTPUTS : מילון של פרמטרים מוקלדים עבור חפצי הפלט שהרכיב מייצר.
- PARAMETERS : מילון של פריטי ExecutionParameter נוספים שיועברו למבצע הרכיבים. אלו הם פרמטרים שאינם חפצים שאנו רוצים להגדיר בצורה גמישה בצינור DSL ולהעביר לביצוע.
הנה דוגמה של ComponentSpec:
class HelloComponentSpec(types.ComponentSpec):
"""ComponentSpec for Custom TFX Hello World Component."""
PARAMETERS = {
# These are parameters that will be passed in the call to
# create an instance of this component.
'name': ExecutionParameter(type=Text),
}
INPUTS = {
# This will be a dictionary with input artifacts, including URIs
'input_data': ChannelParameter(type=standard_artifacts.Examples),
}
OUTPUTS = {
# This will be a dictionary which this component will populate
'output_data': ChannelParameter(type=standard_artifacts.Examples),
}
מְבַצֵעַ
לאחר מכן, כתוב את קוד הביצוע עבור הרכיב החדש. בעיקרון, יש ליצור תת-מחלקה חדשה של base_executor.BaseExecutor
עם ביטול הפונקציה Do
שלו. בפונקציה Do
, הארגומנטים input_dict
, output_dict
ו- exec_properties
המועברים ב- INPUTS
, OUTPUTS
ו- PARAMETERS
המוגדרים ב-ComponentSpec בהתאמה. עבור exec_properties
, ניתן לאחזר את הערך ישירות דרך חיפוש מילון. עבור artifacts ב- input_dict
ו- output_dict
, קיימות פונקציות נוחות זמינות במחלקה artifact_utils שניתן להשתמש בהן כדי להביא מופע artifact או artifact uri.
class Executor(base_executor.BaseExecutor):
"""Executor for HelloComponent."""
def Do(self, input_dict: Dict[Text, List[types.Artifact]],
output_dict: Dict[Text, List[types.Artifact]],
exec_properties: Dict[Text, Any]) -> None:
...
split_to_instance = {}
for artifact in input_dict['input_data']:
for split in json.loads(artifact.split_names):
uri = artifact_utils.get_split_uri([artifact], split)
split_to_instance[split] = uri
for split, instance in split_to_instance.items():
input_dir = instance
output_dir = artifact_utils.get_split_uri(
output_dict['output_data'], split)
for filename in tf.io.gfile.listdir(input_dir):
input_uri = os.path.join(input_dir, filename)
output_uri = os.path.join(output_dir, filename)
io_utils.copy_file(src=input_uri, dst=output_uri, overwrite=True)
בדיקת יחידה מבצע מותאם אישית
ניתן ליצור בדיקות יחידה עבור המבצע המותאם אישית בדומה לזה .
ממשק רכיבים
כעת, כשהחלק המורכב ביותר הושלם, השלב הבא הוא להרכיב את החלקים הללו לממשק רכיב, כדי לאפשר שימוש ברכיב בצינור. ישנם מספר שלבים:
- הפוך את ממשק הרכיב לתת-מחלקה של
base_component.BaseComponent
- הקצה משתנה מחלקה
SPEC_CLASS
עם המחלקהComponentSpec
שהוגדרה קודם לכן - הקצה משתנה מחלקה
EXECUTOR_SPEC
עם מחלקת ה-Executor שהוגדרה קודם לכן - הגדר את פונקציית הבנאי
__init__()
באמצעות הארגומנטים לפונקציה כדי לבנות מופע של המחלקה ComponentSpec ולהפעיל את פונקציית העל עם הערך הזה, יחד עם שם אופציונלי
כאשר נוצר מופע של הרכיב, לוגיקה של בדיקת סוג במחלקה base_component.BaseComponent
תופעל כדי לוודא שהארגומנטים שהועברו תואמים למידע הסוג שהוגדר במחלקה ComponentSpec
.
from tfx.types import standard_artifacts
from hello_component import executor
class HelloComponent(base_component.BaseComponent):
"""Custom TFX Hello World Component."""
SPEC_CLASS = HelloComponentSpec
EXECUTOR_SPEC = executor_spec.ExecutorClassSpec(executor.Executor)
def __init__(self,
input_data: types.Channel = None,
output_data: types.Channel = None,
name: Optional[Text] = None):
if not output_data:
examples_artifact = standard_artifacts.Examples()
examples_artifact.split_names = input_data.get()[0].split_names
output_data = channel_utils.as_channel([examples_artifact])
spec = HelloComponentSpec(input_data=input_data,
output_data=output_data, name=name)
super(HelloComponent, self).__init__(spec=spec)
הרכיבו לתוך צינור TFX
השלב האחרון הוא לחבר את הרכיב המותאם אישית החדש לצינור TFX. מלבד הוספת מופע של הרכיב החדש, יש צורך גם בדברים הבאים:
- חברו אליו כראוי את הרכיבים במעלה הזרם והמורד של הרכיב החדש. זה נעשה על ידי התייחסות לתפוקות הרכיב במעלה הזרם ברכיב החדש והתייחסות לתפוקות של הרכיב החדש ברכיבים במורד הזרם
- הוסף את מופע הרכיבים החדש לרשימת הרכיבים בעת בניית הצינור.
הדוגמה להלן מדגישה את השינויים שהוזכרו לעיל. ניתן למצוא דוגמה מלאה בריפו של TFX GitHub .
def _create_pipeline():
...
example_gen = CsvExampleGen(input_base=examples)
hello = component.HelloComponent(
input_data=example_gen.outputs['examples'], name='HelloWorld')
statistics_gen = StatisticsGen(examples=hello.outputs['output_data'])
...
return pipeline.Pipeline(
...
components=[example_gen, hello, statistics_gen, ...],
...
)
פרוס רכיב מותאם אישית מלא
מלבד שינויי קוד, כל החלקים החדשים שנוספו ( ComponentSpec
, Executor
, component interface) צריכים להיות נגישים בסביבת ריצת צינורות על מנת להפעיל את הצינור כראוי.