Apache Beam และ TFX

Apache Beam ให้กรอบสำหรับการทำงานชุดและสตรีมมิ่งการประมวลผลข้อมูลงานที่ทำงานบนความหลากหลายของเครื่องมือการปฏิบัติ ไลบรารี TFX หลายแห่งใช้ Beam สำหรับการรันงาน ซึ่งช่วยให้สามารถปรับขนาดได้สูงในคลัสเตอร์การประมวลผล บีมรองรับเอ็นจิ้นการสั่งการหรือ "รันเนอร์" ที่หลากหลาย ซึ่งรวมถึงไดเร็ครันเนอร์ที่ทำงานบนโหนดคอมพิวท์เดี่ยวและมีประโยชน์มากสำหรับการพัฒนา การทดสอบ หรือการปรับใช้ขนาดเล็ก Beam มีเลเยอร์ที่เป็นนามธรรมซึ่งช่วยให้ TFX ทำงานบนรันเนอร์ที่รองรับโดยไม่ต้องแก้ไขโค้ด TFX ใช้ Beam Python API ดังนั้นจึงจำกัดเฉพาะนักวิ่งที่ได้รับการสนับสนุนโดย Python API

การปรับใช้และความสามารถในการปรับขนาด

เมื่อความต้องการปริมาณงานเพิ่มขึ้น Beam สามารถปรับขนาดการปรับใช้ที่มีขนาดใหญ่มากในคลัสเตอร์การประมวลผลขนาดใหญ่ได้ สิ่งนี้ถูกจำกัดโดยความสามารถในการปรับขนาดของนักวิ่งที่อยู่ด้านล่างเท่านั้น โดยทั่วไปแล้ว Runners ในการปรับใช้ขนาดใหญ่จะถูกปรับใช้กับระบบการจัดการคอนเทนเนอร์ เช่น Kubernetes หรือ Apache Mesos สำหรับการปรับใช้แอปพลิเคชัน การปรับขนาด และการจัดการโดยอัตโนมัติ

ดู Apache Beam เอกสารสำหรับข้อมูลเพิ่มเติมเกี่ยวกับบีมอาปาเช่

สำหรับผู้ใช้ Google Cloud, Dataflow คือวิ่งแนะนำซึ่งให้เป็นแพลตฟอร์มที่ serverless และค่าใช้จ่ายที่มีประสิทธิภาพผ่าน AutoScaling ของทรัพยากรการปรับสมดุลการทำงานแบบไดนามิกรวมลึกกับบริการอื่น ๆ ของ Google Cloud, built-in การรักษาความปลอดภัยและการตรวจสอบ

รหัส Python ที่กำหนดเองและการพึ่งพา

ความซับซ้อนที่โดดเด่นอย่างหนึ่งของการใช้ Beam ในไปป์ไลน์ TFX คือการจัดการโค้ดที่กำหนดเองและ/หรือการอ้างอิงที่จำเป็นจากโมดูล Python เพิ่มเติม ต่อไปนี้คือตัวอย่างบางส่วนที่อาจเป็นปัญหาได้:

  • preprocessing_fn ต้องอ้างอิงถึงโมดูล Python ของผู้ใช้เอง
  • ตัวแยกแบบกำหนดเองสำหรับส่วนประกอบ Evaluator
  • โมดูลที่กำหนดเองซึ่งจัดประเภทย่อยจากส่วนประกอบ TFX

TFX อาศัยการสนับสนุน Beam สำหรับ ผู้จัดการหลามไปป์ไลน์อ้างอิง ในการจัดการการอ้างอิงหลาม ขณะนี้มีสองวิธีในการจัดการสิ่งนี้:

  1. จัดเตรียมโค้ด Python และการอ้างอิงเป็นซอร์สแพ็คเกจ
  2. [กระแสข้อมูลเท่านั้น] การใช้อิมเมจคอนเทนเนอร์เป็นผู้ปฏิบัติงาน

เหล่านี้จะกล่าวถึงต่อไป

จัดเตรียมโค้ด Python และการอ้างอิงเป็นซอร์สแพ็คเกจ

ขอแนะนำสำหรับผู้ใช้ที่:

  1. คุ้นเคยกับบรรจุภัณฑ์ Python และ
  2. ใช้ซอร์สโค้ด Python เท่านั้น (เช่น ไม่มีโมดูล C หรือไลบรารีที่ใช้ร่วมกัน)

โปรดปฏิบัติตามอย่างใดอย่างหนึ่งของเส้นทางใน การจัดการหลามไปป์ไลน์อ้างอิง เพื่อให้นี้ใช้หนึ่งใน beam_pipeline_args ต่อไปนี้:

  • --setup_file
  • --extra_package
  • --requirements_file

หมายเหตุ: ในกรณีข้างต้นโปรดตรวจสอบให้แน่ใจว่ารุ่นเดียวกันของ tfx ถูกระบุว่าเป็นพึ่งพา

[กระแสข้อมูลเท่านั้น] การใช้อิมเมจคอนเทนเนอร์สำหรับผู้ปฏิบัติงาน

TFX 0.26.0 และเหนือมีการสนับสนุนการทดลองสำหรับการใช้ ภาพภาชนะที่กำหนดเอง สำหรับคนงาน Dataflow

เพื่อที่จะใช้สิ่งนี้ คุณต้อง:

  • สร้างภาพหางซึ่งมีทั้ง tfx และรหัสที่กำหนดเองของผู้ใช้และการอ้างอิงที่ติดตั้ง
    • สำหรับผู้ใช้ที่ (1) การใช้ tfx>=0.26 และ (2) ใช้หลาม 3.7 การพัฒนาระบบท่อส่งของพวกเขาวิธีที่ง่ายที่สุดที่จะทำคือการขยายรุ่นที่สอดคล้องกันอย่างเป็นทางการ tensorflow/tfx ภาพ:
# You can use a build-arg to dynamically pass in the
# version of TFX being used to your Dockerfile.

ARG TFX_VERSION
FROM tensorflow/tfx:${TFX_VERSION}
# COPY your code and dependencies in
  • พุชอิมเมจที่สร้างไปยังรีจิสตรีอิมเมจคอนเทนเนอร์ซึ่งสามารถเข้าถึงได้โดยโปรเจ็กต์ที่ใช้โดย Dataflow
    • ผู้ใช้ Google Cloud สามารถพิจารณาใช้ เมฆรูปร่าง ซึ่งเป็นอย่างดีโดยอัตโนมัติขั้นตอนข้างต้น
  • ให้ต่อไปนี้ beam_pipeline_args :
beam_pipeline_args.extend([
    '--runner=DataflowRunner',
    '--project={project-id}',
    '--worker_harness_container_image={image-ref}',
    '--experiments=use_runner_v2',
])

TODO(b/171733562): ลบ use_runner_v2 เมื่อเป็นค่าเริ่มต้นสำหรับ Dataflow

สิ่งที่ต้องทำ (b / 179,738,639): สร้างเอกสารสำหรับวิธีการที่กำหนดเองภาชนะทดสอบในประเทศหลังจาก https://issues.apache.org/jira/browse/BEAM-5440

อาร์กิวเมนต์บีมไปป์ไลน์

ส่วนประกอบ TFX หลายอย่างอาศัย Beam สำหรับการประมวลผลข้อมูลแบบกระจาย พวกเขามีการกำหนดค่ากับ beam_pipeline_args ซึ่งระบุไว้ในช่วงระหว่างการสร้างท่อส่ง:

my_pipeline = Pipeline(
    ...,
    beam_pipeline_args=[...])

TFX 0.30 และเหนือเพิ่มอินเตอร์เฟซ with_beam_pipeline_args สำหรับการขยาย args ลำแสงระดับท่อต่อองค์ประกอบ:

example_gen = CsvExampleGen(input_base=data_root).with_beam_pipeline_args([...])