การสร้างส่วนประกอบที่กำหนดเองอย่างเต็มที่

คู่มือนี้จะอธิบายวิธีใช้ TFX API เพื่อสร้างคอมโพเนนต์ที่กำหนดเองโดยสมบูรณ์ ส่วนประกอบที่กำหนดเองโดยสมบูรณ์ช่วยให้คุณสร้างส่วนประกอบโดยกำหนดข้อกำหนดส่วนประกอบ ตัวดำเนินการ และคลาสอินเทอร์เฟซส่วนประกอบ แนวทางนี้ช่วยให้คุณใช้ซ้ำและขยายส่วนประกอบมาตรฐานเพื่อให้เหมาะกับความต้องการของคุณได้

หากคุณยังใหม่กับไปป์ไลน์ TFX เรียนรู้เพิ่มเติมเกี่ยวกับแนวคิดหลักของไปป์ไลน์ TFX

ตัวดำเนินการแบบกำหนดเองหรือส่วนประกอบแบบกำหนดเอง

หากจำเป็นต้องใช้ตรรกะการประมวลผลแบบกำหนดเองเท่านั้นในขณะที่คุณสมบัติอินพุต เอาท์พุต และการดำเนินการของส่วนประกอบเหมือนกับส่วนประกอบที่มีอยู่ ตัวดำเนินการแบบกำหนดเองก็เพียงพอแล้ว จำเป็นต้องมีส่วนประกอบแบบกำหนดเองทั้งหมดเมื่อคุณสมบัติอินพุต เอาต์พุต หรือการดำเนินการใดๆ แตกต่างจากส่วนประกอบ TFX ที่มีอยู่

จะสร้างส่วนประกอบที่กำหนดเองได้อย่างไร?

การพัฒนาองค์ประกอบที่กำหนดเองโดยสมบูรณ์จำเป็นต้องมี:

  • ชุดข้อกำหนดเฉพาะของอินพุตและเอาท์พุตที่กำหนดไว้สำหรับส่วนประกอบใหม่ โดยเฉพาะอย่างยิ่ง ประเภทสำหรับสิ่งประดิษฐ์อินพุตควรสอดคล้องกับประเภทสิ่งประดิษฐ์เอาต์พุตของส่วนประกอบที่สร้างสิ่งประดิษฐ์ และประเภทของสิ่งประดิษฐ์เอาต์พุตควรสอดคล้องกับประเภทสิ่งประดิษฐ์อินพุตของส่วนประกอบที่ใช้สิ่งประดิษฐ์ หากมี
  • พารามิเตอร์การดำเนินการที่ไม่ใช่สิ่งประดิษฐ์ที่จำเป็นสำหรับส่วนประกอบใหม่

ส่วนประกอบSpec

คลาส ComponentSpec กำหนดสัญญาส่วนประกอบโดยการกำหนดส่วนอินพุตและเอาต์พุตให้กับส่วนประกอบตลอดจนพารามิเตอร์ที่ใช้สำหรับการดำเนินการส่วนประกอบ มีสามส่วน:

  • INPUTS : พจนานุกรมของพารามิเตอร์ที่พิมพ์สำหรับส่วนอินพุตที่ส่งผ่านไปยังตัวดำเนินการคอมโพเนนต์ โดยปกติแล้ว สิ่งประดิษฐ์อินพุตคือเอาต์พุตจากส่วนประกอบอัปสตรีม ดังนั้นจึงใช้ประเภทเดียวกันร่วมกัน
  • OUTPUTS : พจนานุกรมของพารามิเตอร์ที่พิมพ์สำหรับส่วนเอาต์พุตที่ส่วนประกอบสร้างขึ้น
  • PARAMETERS : พจนานุกรมของรายการ ExecutionParameter เพิ่มเติมที่จะถูกส่งผ่านไปยังตัวดำเนินการส่วนประกอบ พารามิเตอร์เหล่านี้ไม่ใช่พารามิเตอร์ที่เราต้องการกำหนดอย่างยืดหยุ่นในไปป์ไลน์ DSL และส่งต่อไปสู่การดำเนินการ

นี่คือตัวอย่างของ ComponentSpec:

class HelloComponentSpec(types.ComponentSpec):
  """ComponentSpec for Custom TFX Hello World Component."""

  PARAMETERS = {
      # These are parameters that will be passed in the call to
      # create an instance of this component.
      'name': ExecutionParameter(type=Text),
  }
  INPUTS = {
      # This will be a dictionary with input artifacts, including URIs
      'input_data': ChannelParameter(type=standard_artifacts.Examples),
  }
  OUTPUTS = {
      # This will be a dictionary which this component will populate
      'output_data': ChannelParameter(type=standard_artifacts.Examples),
  }

ผู้ดำเนินการ

จากนั้น ให้เขียนโค้ดตัวดำเนินการสำหรับส่วนประกอบใหม่ โดยพื้นฐานแล้ว จะต้องสร้างคลาสย่อยใหม่ของ base_executor.BaseExecutor โดยมีฟังก์ชัน Do แทนที่ ในฟังก์ชัน Do อาร์กิวเมนต์ input_dict , output_dict และ exec_properties ที่ถูกส่งผ่านในการแมปไปยัง INPUTS , OUTPUTS และ PARAMETERS ที่กำหนดไว้ใน ComponentSpec ตามลำดับ สำหรับ exec_properties สามารถดึงค่าได้โดยตรงผ่านการค้นหาพจนานุกรม สำหรับสิ่งประดิษฐ์ใน input_dict และ output_dict มีฟังก์ชันอำนวยความสะดวกที่มีอยู่ในคลาส artifact_utils ที่สามารถใช้เพื่อดึงอินสแตนซ์สิ่งประดิษฐ์หรือสิ่งประดิษฐ์ uri

class Executor(base_executor.BaseExecutor):
  """Executor for HelloComponent."""

  def Do(self, input_dict: Dict[Text, List[types.Artifact]],
         output_dict: Dict[Text, List[types.Artifact]],
         exec_properties: Dict[Text, Any]) -> None:
    ...

    split_to_instance = {}
    for artifact in input_dict['input_data']:
      for split in json.loads(artifact.split_names):
        uri = artifact_utils.get_split_uri([artifact], split)
        split_to_instance[split] = uri

    for split, instance in split_to_instance.items():
      input_dir = instance
      output_dir = artifact_utils.get_split_uri(
          output_dict['output_data'], split)
      for filename in tf.io.gfile.listdir(input_dir):
        input_uri = os.path.join(input_dir, filename)
        output_uri = os.path.join(output_dir, filename)
        io_utils.copy_file(src=input_uri, dst=output_uri, overwrite=True)

หน่วยทดสอบตัวดำเนินการแบบกำหนดเอง

การทดสอบหน่วยสำหรับตัวดำเนินการแบบกำหนดเองสามารถสร้างขึ้นได้คล้ายกับ การทดสอบนี้

อินเทอร์เฟซส่วนประกอบ

ตอนนี้ชิ้นส่วนที่ซับซ้อนที่สุดเสร็จสมบูรณ์แล้ว ขั้นตอนต่อไปคือการประกอบชิ้นส่วนเหล่านี้เข้ากับอินเทอร์เฟซส่วนประกอบ เพื่อให้สามารถใช้ส่วนประกอบในไปป์ไลน์ได้ มีหลายขั้นตอน:

  • ทำให้อินเทอร์เฟซคอมโพเนนต์เป็นคลาสย่อยของ base_component.BaseComponent
  • กำหนดตัวแปรคลาส SPEC_CLASS ด้วยคลาส ComponentSpec ที่กำหนดไว้ก่อนหน้านี้
  • กำหนดตัวแปรคลาส EXECUTOR_SPEC ด้วยคลาส Executor ที่กำหนดไว้ก่อนหน้านี้
  • กำหนดฟังก์ชันตัวสร้าง __init__() โดยใช้อาร์กิวเมนต์ของฟังก์ชันเพื่อสร้างอินสแตนซ์ของคลาส ComponentSpec และเรียกใช้ฟังก์ชัน super ด้วยค่านั้น พร้อมด้วยชื่อที่เป็นทางเลือก

เมื่ออินสแตนซ์ของส่วนประกอบถูกสร้างขึ้น ตรรกะการตรวจสอบประเภทในคลาส base_component.BaseComponent จะถูกเรียกใช้เพื่อให้แน่ใจว่าอาร์กิวเมนต์ที่ส่งผ่านเข้ากันได้กับข้อมูลประเภทที่กำหนดไว้ในคลาส ComponentSpec

from tfx.types import standard_artifacts
from hello_component import executor

class HelloComponent(base_component.BaseComponent):
  """Custom TFX Hello World Component."""

  SPEC_CLASS = HelloComponentSpec
  EXECUTOR_SPEC = executor_spec.ExecutorClassSpec(executor.Executor)

  def __init__(self,
               input_data: types.Channel = None,
               output_data: types.Channel = None,
               name: Optional[Text] = None):
    if not output_data:
      examples_artifact = standard_artifacts.Examples()
      examples_artifact.split_names = input_data.get()[0].split_names
      output_data = channel_utils.as_channel([examples_artifact])

    spec = HelloComponentSpec(input_data=input_data,
                              output_data=output_data, name=name)
    super(HelloComponent, self).__init__(spec=spec)

ประกอบเป็นไปป์ไลน์ TFX

ขั้นตอนสุดท้ายคือการเสียบส่วนประกอบแบบกำหนดเองใหม่เข้ากับไปป์ไลน์ TFX นอกจากการเพิ่มอินสแตนซ์ของส่วนประกอบใหม่แล้ว ยังจำเป็นต้องมีสิ่งต่อไปนี้ด้วย:

  • เชื่อมต่อส่วนประกอบต้นน้ำและปลายน้ำของส่วนประกอบใหม่เข้ากับส่วนประกอบอย่างเหมาะสม สิ่งนี้ทำได้โดยการอ้างอิงเอาท์พุตของส่วนประกอบอัพสตรีมในส่วนประกอบใหม่ และอ้างอิงเอาท์พุตของส่วนประกอบใหม่ในส่วนประกอบดาวน์สตรีม
  • เพิ่มอินสแตนซ์ส่วนประกอบใหม่ลงในรายการส่วนประกอบเมื่อสร้างไปป์ไลน์

ตัวอย่างด้านล่างเน้นการเปลี่ยนแปลงที่กล่าวมาข้างต้น ตัวอย่างเต็มสามารถพบได้ใน TFX GitHub repo

def _create_pipeline():
  ...
  example_gen = CsvExampleGen(input_base=examples)
  hello = component.HelloComponent(
      input_data=example_gen.outputs['examples'], name='HelloWorld')
  statistics_gen = StatisticsGen(examples=hello.outputs['output_data'])
  ...
  return pipeline.Pipeline(
      ...
      components=[example_gen, hello, statistics_gen, ...],
      ...
  )

ปรับใช้คอมโพเนนต์ที่กำหนดเองทั้งหมด

นอกจากการเปลี่ยนแปลงโค้ดแล้ว ส่วนที่เพิ่มใหม่ทั้งหมด ( ComponentSpec , Executor , อินเทอร์เฟซส่วนประกอบ) จะต้องสามารถเข้าถึงได้ในสภาพแวดล้อมการทำงานของไปป์ไลน์เพื่อที่จะรันไปป์ไลน์ได้อย่างถูกต้อง