เข้าร่วมชุมชน SIG TFX-Addons และช่วยปรับปรุง TFX ให้ดียิ่งขึ้น!

การสร้างส่วนประกอบที่กำหนดเองอย่างเต็มที่

คู่มือนี้อธิบายวิธีใช้ TFX API เพื่อสร้างส่วนประกอบที่กำหนดเองทั้งหมด ส่วนประกอบที่กำหนดเองทั้งหมดช่วยให้คุณสร้างส่วนประกอบโดยการกำหนดข้อกำหนดส่วนประกอบ ตัวดำเนินการ และคลาสส่วนต่อประสานส่วนประกอบ วิธีนี้ช่วยให้คุณใช้ซ้ำและขยายองค์ประกอบมาตรฐานให้เหมาะกับความต้องการของคุณได้

ถ้าคุณยังใหม่กับท่อ TFX, เรียนรู้เพิ่มเติมเกี่ยวกับแนวคิดหลักของท่อ TFX

ตัวดำเนินการที่กำหนดเองหรือส่วนประกอบที่กำหนดเอง

ถ้าจำเป็นต้องใช้เฉพาะตรรกะการประมวลผลแบบกำหนดเองในขณะที่คุณสมบัติอินพุต เอาต์พุต และการดำเนินการของส่วนประกอบเหมือนกับส่วนประกอบที่มีอยู่ ตัวดำเนินการแบบกำหนดเองก็เพียงพอแล้ว จำเป็นต้องมีส่วนประกอบที่กำหนดเองอย่างสมบูรณ์เมื่อคุณสมบัติอินพุต เอาต์พุต หรือการดำเนินการใดๆ แตกต่างจากส่วนประกอบ TFX ที่มีอยู่

จะสร้างองค์ประกอบที่กำหนดเองได้อย่างไร?

การพัฒนาองค์ประกอบที่กำหนดเองอย่างสมบูรณ์ต้องการ:

  • ชุดข้อมูลจำเพาะของสิ่งประดิษฐ์อินพุตและเอาต์พุตที่กำหนดไว้สำหรับส่วนประกอบใหม่ โดยเฉพาะอย่างยิ่ง ประเภทของสิ่งประดิษฐ์อินพุตควรสอดคล้องกับประเภทสิ่งประดิษฐ์เอาต์พุตของส่วนประกอบที่สร้างสิ่งประดิษฐ์ และประเภทสำหรับสิ่งประดิษฐ์เอาต์พุตควรสอดคล้องกับประเภทสิ่งประดิษฐ์อินพุตของส่วนประกอบที่ใช้สิ่งประดิษฐ์ หากมี
  • พารามิเตอร์การดำเนินการที่ไม่ใช่สิ่งประดิษฐ์ที่จำเป็นสำหรับส่วนประกอบใหม่

ส่วนประกอบSpec

ComponentSpec ระดับที่กำหนดในสัญญาโดยกำหนดองค์ประกอบเข้าและส่งออกสิ่งประดิษฐ์ส่วนประกอบเช่นเดียวกับพารามิเตอร์ที่จะใช้สำหรับการดำเนินการองค์ประกอบ มีสามส่วนในนั้น:

  • ปัจจัยการผลิต: พจนานุกรมของพารามิเตอร์พิมพ์สำหรับสิ่งประดิษฐ์การป้อนข้อมูลที่มีเข้ามาในปฏิบัติการส่วนประกอบ โดยปกติสิ่งประดิษฐ์อินพุตเป็นเอาต์พุตจากส่วนประกอบต้นน้ำและใช้ประเภทเดียวกันร่วมกัน
  • เอาท์พุท: พจนานุกรมของพารามิเตอร์พิมพ์สำหรับสิ่งประดิษฐ์ที่ส่งออกซึ่งส่วนประกอบที่ผลิต
  • พารามิเตอร์: พจนานุกรมเพิ่มเติม ExecutionParameter รายการที่จะผ่านเข้าสู่ปฏิบัติการส่วนประกอบ เหล่านี้เป็นพารามิเตอร์ที่ไม่ใช่อาร์ติแฟกต์ที่เราต้องการกำหนดอย่างยืดหยุ่นในไพพ์ไลน์ DSL และส่งต่อไปยังการดำเนินการ

นี่คือตัวอย่างของ ComponentSpec:

class HelloComponentSpec(types.ComponentSpec):
  """ComponentSpec for Custom TFX Hello World Component."""

  PARAMETERS = {
      # These are parameters that will be passed in the call to
      # create an instance of this component.
      'name': ExecutionParameter(type=Text),
  }
  INPUTS = {
      # This will be a dictionary with input artifacts, including URIs
      'input_data': ChannelParameter(type=standard_artifacts.Examples),
  }
  OUTPUTS = {
      # This will be a dictionary which this component will populate
      'output_data': ChannelParameter(type=standard_artifacts.Examples),
  }

เพชฌฆาต

ถัดไป เขียนโค้ดตัวดำเนินการสำหรับคอมโพเนนต์ใหม่ โดยทั่วไปเป็น subclass ใหม่ base_executor.BaseExecutor จะต้องมีการสร้างขึ้นด้วย Do ฟังก์ชั่น overriden ในการ Do หน้าที่การขัดแย้ง input_dict , output_dict และ exec_properties ที่ส่งผ่านในแผนที่ไปยัง INPUTS , OUTPUTS และ PARAMETERS ที่กำหนดไว้ใน ComponentSpec ตามลำดับ สำหรับ exec_properties ค่าสามารถเรียกได้โดยตรงผ่านการค้นหาพจนานุกรม สำหรับสิ่งประดิษฐ์ใน input_dict และ output_dict มีฟังก์ชั่นที่สะดวกสบายที่มีอยู่ใน artifact_utils ระดับที่สามารถใช้ในการดึงข้อมูลเช่นสิ่งประดิษฐ์หรือสิ่งประดิษฐ์ URI

class Executor(base_executor.BaseExecutor):
  """Executor for HelloComponent."""

  def Do(self, input_dict: Dict[Text, List[types.Artifact]],
         output_dict: Dict[Text, List[types.Artifact]],
         exec_properties: Dict[Text, Any]) -> None:
    ...

    split_to_instance = {}
    for artifact in input_dict['input_data']:
      for split in json.loads(artifact.split_names):
        uri = artifact_utils.get_split_uri([artifact], split)
        split_to_instance[split] = uri

    for split, instance in split_to_instance.items():
      input_dir = instance
      output_dir = artifact_utils.get_split_uri(
          output_dict['output_data'], split)
      for filename in tf.io.gfile.listdir(input_dir):
        input_uri = os.path.join(input_dir, filename)
        output_uri = os.path.join(output_dir, filename)
        io_utils.copy_file(src=input_uri, dst=output_uri, overwrite=True)

หน่วยทดสอบตัวดำเนินการแบบกำหนดเอง

การทดสอบหน่วยปฏิบัติการที่กำหนดเองสามารถสร้างคล้ายกับ คนนี้

อินเทอร์เฟซส่วนประกอบComponent

เมื่อส่วนที่ซับซ้อนที่สุดเสร็จสมบูรณ์แล้ว ขั้นตอนต่อไปคือการประกอบชิ้นส่วนเหล่านี้เข้ากับอินเทอร์เฟซส่วนประกอบ เพื่อให้สามารถใช้ส่วนประกอบในไปป์ไลน์ได้ มีหลายขั้นตอน:

  • ทำให้องค์ประกอบอินเตอร์เฟซคลาสย่อยของ base_component.BaseComponent
  • กำหนดตัวแปรระดับ SPEC_CLASS กับ ComponentSpec ระดับที่กำหนดไว้ก่อนหน้านี้
  • กำหนดตัวแปรระดับ EXECUTOR_SPEC กับระดับผู้ปฏิบัติการที่ได้รับการกำหนดไว้ก่อนหน้านี้
  • กําหนดการ __init__() ฟังก์ชั่นคอนสตรัคโดยใช้การขัดแย้งกับการทำงานเพื่อสร้างอินสแตนซ์ของคลาส ComponentSpec และเรียกฟังก์ชั่น super กับค่าที่พร้อมกับชื่อตัวเลือก

เมื่อเป็นตัวอย่างขององค์ประกอบที่จะถูกสร้างขึ้นพิมพ์ตรวจสอบตรรกะใน base_component.BaseComponent ระดับจะถูกเรียกเพื่อให้มั่นใจว่าข้อโต้แย้งที่ถูกส่งผ่านไปในเข้ากันได้กับข้อมูลประเภทที่กำหนดไว้ใน ComponentSpec ระดับ

from tfx.types import standard_artifacts
from hello_component import executor

class HelloComponent(base_component.BaseComponent):
  """Custom TFX Hello World Component."""

  SPEC_CLASS = HelloComponentSpec
  EXECUTOR_SPEC = executor_spec.ExecutorClassSpec(executor.Executor)

  def __init__(self,
               input_data: types.Channel = None,
               output_data: types.Channel = None,
               name: Optional[Text] = None):
    if not output_data:
      examples_artifact = standard_artifacts.Examples()
      examples_artifact.split_names = input_data.get()[0].split_names
      output_data = channel_utils.as_channel([examples_artifact])

    spec = HelloComponentSpec(input_data=input_data,
                              output_data=output_data, name=name)
    super(HelloComponent, self).__init__(spec=spec)

ประกอบเป็นท่อส่ง TFX

ขั้นตอนสุดท้ายคือการเสียบส่วนประกอบแบบกำหนดเองใหม่เข้ากับไปป์ไลน์ TFX นอกจากการเพิ่มอินสแตนซ์ขององค์ประกอบใหม่แล้ว ยังจำเป็นต้องมีสิ่งต่อไปนี้:

  • วางสายส่วนประกอบต้นน้ำและปลายน้ำของส่วนประกอบใหม่อย่างถูกต้อง ทำได้โดยอ้างอิงผลลัพธ์ของส่วนประกอบต้นน้ำในส่วนประกอบใหม่ และอ้างอิงผลลัพธ์ของส่วนประกอบใหม่ในส่วนประกอบปลายทาง
  • เพิ่มอินสแตนซ์ส่วนประกอบใหม่ในรายการส่วนประกอบเมื่อสร้างไปป์ไลน์

ตัวอย่างด้านล่างเน้นการเปลี่ยนแปลงดังกล่าว ตัวอย่างเต็มรูปแบบที่สามารถพบได้ใน repo TFX GitHub

def _create_pipeline():
  ...
  example_gen = CsvExampleGen(input_base=examples)
  hello = component.HelloComponent(
      input_data=example_gen.outputs['examples'], name='HelloWorld')
  statistics_gen = StatisticsGen(examples=hello.outputs['output_data'])
  ...
  return pipeline.Pipeline(
      ...
      components=[example_gen, hello, statistics_gen, ...],
      ...
  )

ปรับใช้องค์ประกอบที่กำหนดเองอย่างเต็มที่

ข้างเปลี่ยนแปลงรหัสทุกชิ้นส่วนที่เพิ่มใหม่ ( ComponentSpec , Executor อินเตอร์เฟซ Component) จำเป็นที่จะต้องสามารถเข้าถึงได้ในท่อทำงานสภาพแวดล้อมในการทำงานท่อได้อย่างถูกต้อง