การสร้างชุดข้อมูลขนาดใหญ่ด้วย Apache Beam

ชุดข้อมูลบางชุดมีขนาดใหญ่เกินกว่าจะประมวลผลในเครื่องเดียวได้ tfds รองรับการสร้างข้อมูลในหลาย ๆ เครื่องโดยใช้ Apache Beam

เอกสารนี้มีสองส่วน:

  • สำหรับผู้ใช้ที่ต้องการสร้างชุดข้อมูล Beam ที่มีอยู่
  • สำหรับนักพัฒนาที่ต้องการสร้างชุดข้อมูล Beam ใหม่

การสร้างชุดข้อมูล Beam

ด้านล่างนี้เป็นตัวอย่างต่างๆ ของการสร้างชุดข้อมูล Beam ทั้งบนคลาวด์หรือในเครื่อง

บน Google Cloud Dataflow

หากต้องการรันไปป์ไลน์โดยใช้ Google Cloud Dataflow และใช้ประโยชน์จากการคำนวณแบบกระจาย ขั้นแรกให้ทำตาม คำแนะนำการเริ่มต้นอย่างรวดเร็ว

เมื่อตั้งค่าสภาพแวดล้อมแล้ว คุณสามารถเรียกใช้ tfds build CLI ได้โดยใช้ไดเรกทอรีข้อมูลบน GCS และระบุ ตัวเลือกที่จำเป็น สำหรับแฟล็ก --beam_pipeline_options

เพื่อให้เรียกใช้สคริปต์ได้ง่ายขึ้น คุณควรกำหนดตัวแปรต่อไปนี้โดยใช้ค่าจริงสำหรับการตั้งค่า GCP/GCS และชุดข้อมูลที่คุณต้องการสร้าง

DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
GCP_PROJECT=my-project-id
GCS_BUCKET=gs://my-gcs-bucket

จากนั้น คุณจะต้องสร้างไฟล์เพื่อบอกให้ Dataflow ติดตั้ง tfds ให้กับผู้ปฏิบัติงาน:

echo "tensorflow_datasets[$DATASET_NAME]" > /tmp/beam_requirements.txt

หากคุณใช้ tfds-nightly ตรวจสอบให้แน่ใจว่าได้สะท้อนจาก tfds-nightly ในกรณีที่ชุดข้อมูลได้รับการอัปเดตตั้งแต่รุ่นล่าสุด

echo "tfds-nightly[$DATASET_NAME]" > /tmp/beam_requirements.txt

หากคุณใช้การขึ้นต่อกันเพิ่มเติมที่ไม่รวมอยู่ในไลบรารี TFDS ให้ทำตาม คำแนะนำในการจัดการการขึ้นต่อกันของไปป์ไลน์ Python

สุดท้าย คุณสามารถเริ่มงานได้โดยใช้คำสั่งด้านล่าง:

tfds build $DATASET_NAME/$DATASET_CONFIG \
  --data_dir=$GCS_BUCKET/tensorflow_datasets \
  --beam_pipeline_options=\
"runner=DataflowRunner,project=$GCP_PROJECT,job_name=$DATASET_NAME-gen,"\
"staging_location=$GCS_BUCKET/binaries,temp_location=$GCS_BUCKET/temp,"\
"requirements_file=/tmp/beam_requirements.txt"

ในท้องถิ่น

หากต้องการรันสคริปต์ของคุณภายในเครื่องโดยใช้ Apache Beam runner เริ่มต้น (ต้องพอดีกับข้อมูลทั้งหมดในหน่วยความจำ) คำสั่งจะเหมือนกับชุดข้อมูลอื่นๆ:

tfds build my_dataset

หากต้องการรันไปป์ไลน์โดยใช้ Apache Flink คุณสามารถอ่าน เอกสารอย่างเป็นทางการได้ ตรวจสอบให้แน่ใจว่า Beam ของคุณสอดคล้องกับ ความเข้ากันได้ของเวอร์ชัน Flink

เพื่อให้เปิดสคริปต์ได้ง่ายขึ้น คุณควรกำหนดตัวแปรต่อไปนี้โดยใช้ค่าจริงสำหรับการตั้งค่า Flink และชุดข้อมูลที่คุณต้องการสร้าง:

DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
FLINK_CONFIG_DIR=<flink-config-directory>
FLINK_VERSION=<flink-version>

หากต้องการทำงานบนคลัสเตอร์ Flink แบบฝัง คุณสามารถเริ่มงานได้โดยใช้คำสั่งด้านล่าง:

tfds build $DATASET_NAME/$DATASET_CONFIG \
  --beam_pipeline_options=\
"runner=FlinkRunner,flink_version=$FLINK_VERSION,flink_conf_dir=$FLINK_CONFIG_DIR"

ด้วยสคริปต์ที่กำหนดเอง

ในการสร้างชุดข้อมูลบน Beam นั้น API จะเหมือนกับชุดข้อมูลอื่นๆ คุณสามารถปรับแต่ง beam.Pipeline ได้โดยใช้อาร์กิวเมนต์ beam_options (และ beam_runner ) ของ DownloadConfig

# If you are running on Dataflow, Spark,..., you may have to set-up runtime
# flags. Otherwise, you can leave flags empty [].
flags = ['--runner=DataflowRunner', '--project=<project-name>', ...]

# `beam_options` (and `beam_runner`) will be forwarded to `beam.Pipeline`
dl_config = tfds.download.DownloadConfig(
    beam_options=beam.options.pipeline_options.PipelineOptions(flags=flags)
)
data_dir = 'gs://my-gcs-bucket/tensorflow_datasets'
builder = tfds.builder('wikipedia/20190301.en', data_dir=data_dir)
builder.download_and_prepare(download_config=dl_config)

การใช้ชุดข้อมูล Beam

ข้อกำหนดเบื้องต้น

ในการเขียนชุดข้อมูล Apache Beam คุณควรคุ้นเคยกับแนวคิดต่อไปนี้:

คำแนะนำ

หากคุณคุ้นเคยกับ คู่มือการสร้างชุดข้อมูล การเพิ่มชุดข้อมูล Beam จะต้องแก้ไขฟังก์ชัน _generate_examples เท่านั้น ฟังก์ชั่นควรส่งคืนวัตถุลำแสงแทนที่จะเป็นตัวสร้าง:

ชุดข้อมูลที่ไม่ใช่ลำแสง:

def _generate_examples(self, path):
  for f in path.iterdir():
    yield _process_example(f)

ชุดข้อมูลบีม:

def _generate_examples(self, path):
  return (
      beam.Create(path.iterdir())
      | beam.Map(_process_example)
  )

ส่วนที่เหลือทั้งหมดสามารถเหมือนกันได้ 100% รวมถึงการทดสอบด้วย

ข้อควรพิจารณาเพิ่มเติมบางประการ:

  • ใช้ tfds.core.lazy_imports เพื่อนำเข้า Apache Beam เมื่อใช้การพึ่งพาแบบ Lazy ผู้ใช้ยังสามารถอ่านชุดข้อมูลได้หลังจากที่ถูกสร้างขึ้นโดยไม่ต้องติดตั้ง Beam
  • ระวังการปิด Python เมื่อรันไปป์ไลน์ ฟังก์ชัน beam.Map และ beam.DoFn จะถูกทำให้เป็นอนุกรมโดยใช้ pickle และส่งไปยังผู้ปฏิบัติงานทุกคน อย่าใช้วัตถุที่ไม่แน่นอนภายใน beam.PTransform หากต้องมีการแบ่งปันสถานะระหว่างคนงาน
  • เนื่องจากวิธีที่ tfds.core.DatasetBuilder ถูกทำให้เป็นอนุกรมด้วย Pickle การกลายพันธุ์ tfds.core.DatasetBuilder ในระหว่างการสร้างข้อมูลจะถูกละเว้นจากผู้ปฏิบัติงาน (เช่น ไม่สามารถตั้งค่า self.info.metadata['offset'] = 123 ใน _split_generators และเข้าถึงได้จากคนงานเช่น beam.Map(lambda x: x + self.info.metadata['offset'])
  • หากคุณต้องการแชร์ขั้นตอนไปป์ไลน์ระหว่างการแยก คุณสามารถเพิ่ม pipeline: beam.Pipeline kwarg ถึง _split_generator และควบคุมไปป์ไลน์การสร้างแบบเต็ม ดูเอกสาร _generate_examples ของ tfds.core.GeneratorBasedBuilder

ตัวอย่าง

นี่คือตัวอย่างของชุดข้อมูล Beam

class DummyBeamDataset(tfds.core.GeneratorBasedBuilder):

  VERSION = tfds.core.Version('1.0.0')

  def _info(self):
    return self.dataset_info_from_configs(
        features=tfds.features.FeaturesDict({
            'image': tfds.features.Image(shape=(16, 16, 1)),
            'label': tfds.features.ClassLabel(names=['dog', 'cat']),
        }),
    )

  def _split_generators(self, dl_manager):
    ...
    return {
        'train': self._generate_examples(file_dir='path/to/train_data/'),
        'test': self._generate_examples(file_dir='path/to/test_data/'),
    }

  def _generate_examples(self, file_dir: str):
    """Generate examples as dicts."""
    beam = tfds.core.lazy_imports.apache_beam

    def _process_example(filename):
      # Use filename as key
      return filename, {
          'image': os.path.join(file_dir, filename),
          'label': filename.split('.')[1],  # Extract label: "0010102.dog.jpeg"
      }

    return (
        beam.Create(tf.io.gfile.listdir(file_dir))
        | beam.Map(_process_example)
    )

ดำเนินการไปป์ไลน์ของคุณ

หากต้องการรันไปป์ไลน์ โปรดดูที่ส่วนด้านบน

tfds build my_dataset --register_checksums

ไปป์ไลน์โดยใช้ TFDS เป็นอินพุต

หากคุณต้องการสร้างบีมไปป์ไลน์ซึ่งใช้ชุดข้อมูล TFDS เป็นแหล่งที่มา คุณสามารถใช้ tfds.beam.ReadFromTFDS :

builder = tfds.builder('my_dataset')

_ = (
    pipeline
    | tfds.beam.ReadFromTFDS(builder, split='train')
    | beam.Map(tfds.as_numpy)
    | ...
)

โดยจะประมวลผลแต่ละส่วนของชุดข้อมูลแบบขนาน