อาปาเช่บีมและ TFX

Apache Beam จัดเตรียมเฟรมเวิร์กสำหรับการรันงานการประมวลผลข้อมูลแบบแบตช์และการสตรีมที่ทำงานบนกลไกการดำเนินการที่หลากหลาย ไลบรารี TFX หลายแห่งใช้ Beam เพื่อรันงาน ซึ่งทำให้มีความสามารถในการปรับขนาดในคลัสเตอร์การประมวลผลในระดับสูง Beam มีการรองรับเอ็นจิ้นการประมวลผลหรือ "รันเนอร์" ที่หลากหลาย รวมถึงไดเร็กรันเนอร์ที่ทำงานบนโหนดประมวลผลเดียว และมีประโยชน์มากสำหรับการพัฒนา การทดสอบ หรือการปรับใช้ขนาดเล็ก Beam จัดเตรียมเลเยอร์นามธรรมซึ่งช่วยให้ TFX สามารถทำงานบนรันเนอร์ที่รองรับโดยไม่ต้องแก้ไขโค้ด TFX ใช้ Beam Python API ดังนั้นจึงจำกัดเฉพาะรันเนอร์ที่ Python API รองรับ

การปรับใช้และความสามารถในการขยายขนาด

เนื่องจากข้อกำหนดด้านปริมาณงานเพิ่มขึ้น Beam จึงสามารถปรับขนาดไปสู่การใช้งานขนาดใหญ่มากทั่วทั้งคลัสเตอร์การประมวลผลขนาดใหญ่ สิ่งนี้ถูกจำกัดด้วยความสามารถในการปรับขนาดของนักวิ่งต้นแบบเท่านั้น โดยทั่วไปแล้วนักวิ่งในการปรับใช้ขนาดใหญ่จะถูกปรับใช้กับระบบการจัดการคอนเทนเนอร์เช่น Kubernetes หรือ Apache Mesos สำหรับการปรับใช้แอปพลิเคชันอัตโนมัติ การปรับขนาด และการจัดการ

ดูเอกสารประกอบ ของ Apache Beam สำหรับข้อมูลเพิ่มเติมเกี่ยวกับ Apache Beam

สำหรับผู้ใช้ Google Cloud นั้น Dataflow คือตัวดำเนินการที่แนะนำ ซึ่งมีแพลตฟอร์มแบบไร้เซิร์ฟเวอร์และคุ้มค่าผ่านการปรับขนาดทรัพยากรอัตโนมัติ การปรับสมดุลงานแบบไดนามิก การผสานรวมเชิงลึกกับบริการอื่นๆ ของ Google Cloud การรักษาความปลอดภัยในตัว และการตรวจสอบ

รหัส Python ที่กำหนดเองและการพึ่งพา

ความซับซ้อนที่โดดเด่นประการหนึ่งของการใช้ Beam ในไปป์ไลน์ TFX คือการจัดการโค้ดที่กำหนดเองและ/หรือการขึ้นต่อกันที่จำเป็นจากโมดูล Python เพิ่มเติม ต่อไปนี้คือตัวอย่างกรณีที่อาจเป็นปัญหา:

  • preprocessing_fn จำเป็นต้องอ้างอิงถึงโมดูล Python ของผู้ใช้เอง
  • ตัวแยกแบบกำหนดเองสำหรับส่วนประกอบ Evaluator
  • โมดูลที่กำหนดเองซึ่งมีคลาสย่อยจากส่วนประกอบ TFX

TFX อาศัยการสนับสนุนของ Beam ใน การจัดการการพึ่งพา Python Pipeline เพื่อจัดการการพึ่งพา Python ขณะนี้มีสองวิธีในการจัดการสิ่งนี้:

  1. การจัดเตรียมรหัส Python และการพึ่งพาเป็นแพ็คเกจซอร์ส
  2. [กระแสข้อมูลเท่านั้น] การใช้อิมเมจคอนเทนเนอร์เป็นผู้ปฏิบัติงาน

สิ่งเหล่านี้จะกล่าวถึงต่อไป

จัดเตรียมโค้ด Python และการพึ่งพาเป็นแพ็คเกจซอร์ส

ขอแนะนำสำหรับผู้ใช้ที่:

  1. คุ้นเคยกับแพ็คเกจ Python และ
  2. ใช้ซอร์สโค้ด Python เท่านั้น (เช่น ไม่มีโมดูล C หรือไลบรารีที่แบ่งใช้)

โปรดปฏิบัติตามหนึ่งในเส้นทางใน การจัดการการพึ่งพาไพพ์ไลน์ของ Python เพื่อระบุสิ่งนี้โดยใช้หนึ่งในลำแสงต่อไปนี้:

  • --setup_file
  • --extra_package
  • --requirements_file

หมายเหตุ: ในกรณีใดๆ ข้างต้น โปรดตรวจสอบให้แน่ใจว่า tfx เวอร์ชันเดียวกันนั้นอยู่ในรายการการอ้างอิง

[กระแสข้อมูลเท่านั้น] การใช้อิมเมจคอนเทนเนอร์สำหรับผู้ปฏิบัติงาน

TFX 0.26.0 ขึ้นไปมีการรองรับแบบทดลองสำหรับการใช้ อิมเมจคอนเทนเนอร์ที่กำหนดเอง สำหรับผู้ปฏิบัติงาน Dataflow

เพื่อที่จะใช้สิ่งนี้ คุณต้อง:

  • สร้างอิมเมจ Docker ซึ่งมีทั้ง tfx และโค้ดที่กำหนดเองของผู้ใช้และการอ้างอิงที่ติดตั้งไว้ล่วงหน้า
    • สำหรับผู้ใช้ที่ (1) ใช้ tfx>=0.26 และ (2) ใช้ python 3.7 เพื่อพัฒนาไปป์ไลน์ วิธีที่ง่ายที่สุดในการทำเช่นนี้คือการขยายเวอร์ชันที่สอดคล้องกันของอิมเมจ tensorflow/tfx อย่างเป็นทางการ:
# You can use a build-arg to dynamically pass in the
# version of TFX being used to your Dockerfile.

ARG TFX_VERSION
FROM tensorflow/tfx:${TFX_VERSION}
# COPY your code and dependencies in
  • พุชอิมเมจที่สร้างไปยังรีจิสทรีอิมเมจคอนเทนเนอร์ซึ่งโปรเจ็กต์ที่ Dataflow เข้าถึงได้
    • ผู้ใช้ Google Cloud สามารถพิจารณาใช้ Cloud Build ซึ่งดำเนินการตามขั้นตอนข้างต้นโดยอัตโนมัติ
  • ระบุ beam_pipeline_args ต่อไปนี้:
beam_pipeline_args.extend([
    '--runner=DataflowRunner',
    '--project={project-id}',
    '--worker_harness_container_image={image-ref}',
    '--experiments=use_runner_v2',
])

TODO(b/171733562): ลบ use_runner_v2 เมื่อเป็นค่าเริ่มต้นสำหรับ Dataflow

สิ่งที่ต้องทำ (b/179738639): สร้างเอกสารสำหรับวิธีทดสอบคอนเทนเนอร์แบบกำหนดเองในเครื่องหลังจาก https://issues.apache.org/jira/browse/BEAM-5440

อาร์กิวเมนต์ไปป์ไลน์ของบีม

ส่วนประกอบ TFX หลายอย่างอาศัย Beam สำหรับการประมวลผลข้อมูลแบบกระจาย มีการกำหนดค่าด้วย beam_pipeline_args ซึ่งระบุไว้ระหว่างการสร้างไปป์ไลน์:

my_pipeline = Pipeline(
    ...,
    beam_pipeline_args=[...])

TFX 0.30 และสูงกว่าเพิ่มอินเทอร์เฟซ with_beam_pipeline_args สำหรับการขยาย args ลำแสงระดับไปป์ไลน์ต่อส่วนประกอบ:

example_gen = CsvExampleGen(input_base=data_root).with_beam_pipeline_args([...])