بناء مكونات مخصصة بالكامل

يصف هذا الدليل كيفية استخدام TFX API لإنشاء مكون مخصص بالكامل. تتيح لك المكونات المخصصة بالكامل إنشاء المكونات عن طريق تحديد مواصفات المكونات والمنفذ وفئات واجهة المكونات. يتيح لك هذا الأسلوب إعادة استخدام مكون قياسي وتوسيعه ليناسب احتياجاتك.

إذا كنت جديدًا في استخدام خطوط أنابيب TFX، فتعرف على المزيد حول المفاهيم الأساسية لخطوط أنابيب TFX .

منفذ مخصص أو مكون مخصص

إذا كانت هناك حاجة إلى منطق معالجة مخصص فقط في حين أن المدخلات والمخرجات وخصائص التنفيذ للمكون هي نفس المكون الموجود، فإن المنفذ المخصص يكون كافيًا. هناك حاجة إلى مكون مخصص بالكامل عندما يختلف أي من المدخلات أو المخرجات أو خصائص التنفيذ عن أي مكونات TFX موجودة.

كيفية إنشاء مكون مخصص؟

يتطلب تطوير مكون مخصص بالكامل ما يلي:

  • مجموعة محددة من مواصفات المدخلات والمخرجات للمكون الجديد. على وجه الخصوص، يجب أن تكون أنواع المصنوعات اليدوية متسقة مع أنواع المصنوعات اليدوية للمكونات التي تنتج المصنوعات وأنواع المصنوعات اليدوية للمخرجات يجب أن تكون متسقة مع أنواع المصنوعات اليدوية للمكونات التي تستهلك المصنوعات إن وجدت.
  • معلمات التنفيذ غير المصطنعة المطلوبة للمكون الجديد.

مواصفات المكونات

تحدد فئة ComponentSpec عقد المكون عن طريق تحديد عناصر الإدخال والإخراج للمكون بالإضافة إلى المعلمات المستخدمة لتنفيذ المكون. لها ثلاثة أجزاء:

  • المدخلات : قاموس المعلمات المكتوبة لعناصر الإدخال التي يتم تمريرها إلى منفذ تنفيذ المكون. عادةً ما تكون عناصر الإدخال هي المخرجات من المكونات الأولية وبالتالي تشترك في نفس النوع.
  • المخرجات : قاموس المعلمات المكتوبة لعناصر الإخراج التي ينتجها المكون.
  • المعلمات : قاموس لعناصر معلمات التنفيذ الإضافية التي سيتم تمريرها إلى منفذ تنفيذ المكون. هذه معلمات غير صناعية نريد تحديدها بمرونة في خط أنابيب DSL وتمريرها إلى التنفيذ.

فيما يلي مثال على ComponentSpec:

class HelloComponentSpec(types.ComponentSpec):
  """ComponentSpec for Custom TFX Hello World Component."""

  PARAMETERS = {
      # These are parameters that will be passed in the call to
      # create an instance of this component.
      'name': ExecutionParameter(type=Text),
  }
  INPUTS = {
      # This will be a dictionary with input artifacts, including URIs
      'input_data': ChannelParameter(type=standard_artifacts.Examples),
  }
  OUTPUTS = {
      # This will be a dictionary which this component will populate
      'output_data': ChannelParameter(type=standard_artifacts.Examples),
  }

المنفذ

بعد ذلك، اكتب رمز المنفذ للمكون الجديد. بشكل أساسي، يجب إنشاء فئة فرعية جديدة من base_executor.BaseExecutor مع تجاوز وظيفة Do الخاصة بها. في الدالة Do ، يتم تعيين الوسيطات input_dict و output_dict و exec_properties إلى INPUTS و OUTPUTS و PARAMETERS التي تم تعريفها في ComponentSpec على التوالي. بالنسبة إلى exec_properties ، يمكن جلب القيمة مباشرة من خلال البحث في القاموس. بالنسبة للعناصر الموجودة في input_dict و output_dict ، توجد وظائف مناسبة متاحة في فئة artifact_utils والتي يمكن استخدامها لجلب مثيل قطعة أثرية أو معرف uri للقطعة الأثرية.

class Executor(base_executor.BaseExecutor):
  """Executor for HelloComponent."""

  def Do(self, input_dict: Dict[Text, List[types.Artifact]],
         output_dict: Dict[Text, List[types.Artifact]],
         exec_properties: Dict[Text, Any]) -> None:
    ...

    split_to_instance = {}
    for artifact in input_dict['input_data']:
      for split in json.loads(artifact.split_names):
        uri = artifact_utils.get_split_uri([artifact], split)
        split_to_instance[split] = uri

    for split, instance in split_to_instance.items():
      input_dir = instance
      output_dir = artifact_utils.get_split_uri(
          output_dict['output_data'], split)
      for filename in tf.io.gfile.listdir(input_dir):
        input_uri = os.path.join(input_dir, filename)
        output_uri = os.path.join(output_dir, filename)
        io_utils.copy_file(src=input_uri, dst=output_uri, overwrite=True)

وحدة اختبار منفذ مخصص

يمكن إنشاء اختبارات الوحدة للمنفذ المخصص بشكل مشابه لهذا الاختبار .

واجهة المكون

الآن بعد أن اكتمل الجزء الأكثر تعقيدًا، فإن الخطوة التالية هي تجميع هذه القطع في واجهة مكون، لتمكين استخدام المكون في خط الأنابيب. هناك عدة خطوات:

  • اجعل واجهة المكون فئة فرعية من base_component.BaseComponent
  • قم بتعيين متغير فئة SPEC_CLASS مع فئة ComponentSpec التي تم تعريفها مسبقًا
  • قم بتعيين متغير فئة EXECUTOR_SPEC مع فئة Executor التي تم تعريفها مسبقًا
  • قم بتعريف الدالة المُنشئة __init__() باستخدام وسيطات الدالة لإنشاء مثيل لفئة ComponentSpec واستدعاء الدالة الفائقة بهذه القيمة، بالإضافة إلى اسم اختياري

عند إنشاء مثيل للمكون، اكتب منطق التحقق في فئة base_component.BaseComponent سيتم استدعاؤه للتأكد من أن الوسيطات التي تم تمريرها متوافقة مع معلومات النوع المحددة في فئة ComponentSpec .

from tfx.types import standard_artifacts
from hello_component import executor

class HelloComponent(base_component.BaseComponent):
  """Custom TFX Hello World Component."""

  SPEC_CLASS = HelloComponentSpec
  EXECUTOR_SPEC = executor_spec.ExecutorClassSpec(executor.Executor)

  def __init__(self,
               input_data: types.Channel = None,
               output_data: types.Channel = None,
               name: Optional[Text] = None):
    if not output_data:
      examples_artifact = standard_artifacts.Examples()
      examples_artifact.split_names = input_data.get()[0].split_names
      output_data = channel_utils.as_channel([examples_artifact])

    spec = HelloComponentSpec(input_data=input_data,
                              output_data=output_data, name=name)
    super(HelloComponent, self).__init__(spec=spec)

تجميع في خط أنابيب TFX

الخطوة الأخيرة هي توصيل المكون المخصص الجديد بخط أنابيب TFX. إلى جانب إضافة مثيل للمكون الجديد، هناك حاجة أيضًا إلى ما يلي:

  • قم بتوصيل المكونات الأولية والنهائية للمكون الجديد به بشكل صحيح. ويتم ذلك عن طريق الرجوع إلى مخرجات المكون الرئيسي في المكون الجديد والرجوع إلى مخرجات المكون الجديد في المكونات النهائية
  • أضف مثيل المكون الجديد إلى قائمة المكونات عند إنشاء المسار.

يسلط المثال أدناه الضوء على التغييرات المذكورة أعلاه. يمكن العثور على المثال الكامل في TFX GitHub repo .

def _create_pipeline():
  ...
  example_gen = CsvExampleGen(input_base=examples)
  hello = component.HelloComponent(
      input_data=example_gen.outputs['examples'], name='HelloWorld')
  statistics_gen = StatisticsGen(examples=hello.outputs['output_data'])
  ...
  return pipeline.Pipeline(
      ...
      components=[example_gen, hello, statistics_gen, ...],
      ...
  )

نشر مكون مخصص بالكامل

إلى جانب تغييرات التعليمات البرمجية، يجب أن تكون جميع الأجزاء المضافة حديثًا ( ComponentSpec و Executor وComponent Interface) قابلة للوصول في بيئة تشغيل خط الأنابيب من أجل تشغيل خط الأنابيب بشكل صحيح.