يصف هذا الدليل كيفية استخدام واجهة برمجة تطبيقات TFX لإنشاء مكون مخصص بالكامل. تتيح لك المكونات المخصصة بالكامل إنشاء مكونات من خلال تحديد مواصفات المكون والمنفذ وفئات واجهة المكون. يتيح لك هذا الأسلوب إعادة استخدام المكون القياسي وتوسيعه ليلائم احتياجاتك.
إذا كنت جديدًا على خطوط أنابيب TFX ، فتعرف على المزيد حول المفاهيم الأساسية لخطوط أنابيب TFX .
المنفذ المخصص أو المكون المخصص
إذا كانت هناك حاجة إلى منطق معالجة مخصص فقط بينما تكون المدخلات والمخرجات وخصائص التنفيذ للمكون هي نفسها المكون الحالي ، يكون المنفذ المخصص كافيًا. هناك حاجة إلى مكون مخصص بالكامل عندما يختلف أي من المدخلات أو المخرجات أو خصائص التنفيذ عن أي مكونات TFX موجودة.
كيفية إنشاء مكون مخصص؟
يتطلب تطوير مكون مخصص بالكامل:
- مجموعة محددة من مواصفات المدخلات والمخرجات للمكون الجديد. بشكل خاص ، يجب أن تكون أنواع الأدوات المدخلة متسقة مع أنواع القطع الأثرية للمكونات التي تنتج القطع الأثرية وأنواع المخرجات يجب أن تكون متسقة مع أنواع الأداة المدخلة للمكونات التي تستهلك القطع الأثرية إن وجدت.
- معاملات التنفيذ غير المصطنعة المطلوبة للمكون الجديد.
المكون
تحدد فئة ComponentSpec
عقد المكون من خلال تحديد عناصر الإدخال والإخراج للمكون بالإضافة إلى المعلمات المستخدمة لتنفيذ المكون. تتكون من ثلاثة أجزاء:
- المدخلات : قاموس للمعلمات المكتوبة لعناصر الإدخال التي يتم تمريرها إلى منفذ المكون. عادةً ما تكون عناصر الإدخال عبارة عن نواتج من مكونات أولية وبالتالي تشترك في نفس النوع.
- المخرجات : قاموس للمعلمات المكتوبة لنواتج المخرجات التي ينتجها المكون.
- المعلمات : قاموس عناصر ExecutionParameter الإضافية التي سيتم تمريرها إلى منفذ المكون. هذه هي المعلمات غير المصنوعة يدويًا التي نريد تحديدها بمرونة في خط الأنابيب DSL وتمريرها إلى التنفيذ.
فيما يلي مثال على ComponentSpec:
class HelloComponentSpec(types.ComponentSpec):
"""ComponentSpec for Custom TFX Hello World Component."""
PARAMETERS = {
# These are parameters that will be passed in the call to
# create an instance of this component.
'name': ExecutionParameter(type=Text),
}
INPUTS = {
# This will be a dictionary with input artifacts, including URIs
'input_data': ChannelParameter(type=standard_artifacts.Examples),
}
OUTPUTS = {
# This will be a dictionary which this component will populate
'output_data': ChannelParameter(type=standard_artifacts.Examples),
}
المنفذ
بعد ذلك ، اكتب رمز المنفذ للمكون الجديد. بشكل أساسي ، يجب إنشاء فئة فرعية جديدة من base_executor.BaseExecutor
مع تجاوز وظيفة Do
في دالة Do
، الوسيطات input_dict
و output_dict
و exec_properties
التي تم تمريرها في الخريطة إلى INPUTS
و OUTPUTS
و PARAMETERS
التي تم تحديدها في ComponentSpec على التوالي. بالنسبة إلى exec_properties
، يمكن جلب القيمة مباشرة من خلال البحث في القاموس. بالنسبة للعناصر الأثرية في input_dict
و output_dict
، هناك وظائف ملائمة متاحة في فئة artifact_utils التي يمكن استخدامها لجلب مثيل الأداة أو uri الخاص بالقطعة الأثرية.
class Executor(base_executor.BaseExecutor):
"""Executor for HelloComponent."""
def Do(self, input_dict: Dict[Text, List[types.Artifact]],
output_dict: Dict[Text, List[types.Artifact]],
exec_properties: Dict[Text, Any]) -> None:
...
split_to_instance = {}
for artifact in input_dict['input_data']:
for split in json.loads(artifact.split_names):
uri = artifact_utils.get_split_uri([artifact], split)
split_to_instance[split] = uri
for split, instance in split_to_instance.items():
input_dir = instance
output_dir = artifact_utils.get_split_uri(
output_dict['output_data'], split)
for filename in tf.io.gfile.listdir(input_dir):
input_uri = os.path.join(input_dir, filename)
output_uri = os.path.join(output_dir, filename)
io_utils.copy_file(src=input_uri, dst=output_uri, overwrite=True)
وحدة اختبار المنفذ المخصص
يمكن إنشاء اختبارات الوحدة للمنفذ المخصص على غرار هذا الاختبار.
واجهة المكون
الآن وقد اكتمل الجزء الأكثر تعقيدًا ، فإن الخطوة التالية هي تجميع هذه القطع في واجهة مكون ، لتمكين استخدام المكون في خط الأنابيب. هناك عدة خطوات:
- اجعل واجهة المكون فئة فرعية من
base_component.BaseComponent
- قم بتعيين
SPEC_CLASS
متغير فئة مع فئةComponentSpec
التي تم تعريفها مسبقًا - قم بتعيين متغير فئة
EXECUTOR_SPEC
مع فئة Executor التي تم تعريفها مسبقًا - حدد
__init__()
باستخدام الوسيطات الخاصة بالدالة لإنشاء مثيل لفئة ComponentSpec واستدعاء الوظيفة الفائقة بهذه القيمة ، جنبًا إلى جنب مع اسم اختياري
عند إنشاء مثيل للمكون ، اكتب التحقق من المنطق في base_component.BaseComponent
سيتم استدعاء الفئة للتأكد من أن الوسائط التي تم تمريرها متوافقة مع معلومات النوع المحددة في فئة ComponentSpec
.
from tfx.types import standard_artifacts
from hello_component import executor
class HelloComponent(base_component.BaseComponent):
"""Custom TFX Hello World Component."""
SPEC_CLASS = HelloComponentSpec
EXECUTOR_SPEC = executor_spec.ExecutorClassSpec(executor.Executor)
def __init__(self,
input_data: types.Channel = None,
output_data: types.Channel = None,
name: Optional[Text] = None):
if not output_data:
examples_artifact = standard_artifacts.Examples()
examples_artifact.split_names = input_data.get()[0].split_names
output_data = channel_utils.as_channel([examples_artifact])
spec = HelloComponentSpec(input_data=input_data,
output_data=output_data, name=name)
super(HelloComponent, self).__init__(spec=spec)
تجميع في خط أنابيب TFX
الخطوة الأخيرة هي توصيل المكون المخصص الجديد بخط أنابيب TFX. بالإضافة إلى إضافة مثيل للمكون الجديد ، هناك حاجة أيضًا إلى ما يلي:
- قم بتوصيل مكونات المنبع والمصب للمكون الجديد بشكل صحيح. يتم ذلك عن طريق الرجوع إلى مخرجات المكون الرئيسي في المكون الجديد والإشارة إلى مخرجات المكون الجديد في المكونات النهائية
- أضف مثيل المكون الجديد إلى قائمة المكونات عند إنشاء خط الأنابيب.
يسلط المثال أدناه الضوء على التغييرات المذكورة أعلاه. يمكن العثور على المثال الكامل في TFX GitHub repo .
def _create_pipeline():
...
example_gen = CsvExampleGen(input_base=examples)
hello = component.HelloComponent(
input_data=example_gen.outputs['examples'], name='HelloWorld')
statistics_gen = StatisticsGen(examples=hello.outputs['output_data'])
...
return pipeline.Pipeline(
...
components=[example_gen, hello, statistics_gen, ...],
...
)
نشر مكون مخصص بالكامل
بجانب تغييرات الكود ، يجب أن تكون جميع الأجزاء المضافة حديثًا ( ComponentSpec
، Executor
، واجهة المكون) متاحة في بيئة تشغيل خط الأنابيب من أجل تشغيل خط الأنابيب بشكل صحيح.