ชุดข้อมูลบางชุดมีขนาดใหญ่เกินกว่าจะประมวลผลในเครื่องเดียวได้ tfds
รองรับการสร้างข้อมูลในหลาย ๆ เครื่องโดยใช้ Apache Beam
เอกสารนี้มีสองส่วน:
- สำหรับผู้ใช้ที่ต้องการสร้างชุดข้อมูล Beam ที่มีอยู่
- สำหรับนักพัฒนาที่ต้องการสร้างชุดข้อมูล Beam ใหม่
การสร้างชุดข้อมูล Beam
ด้านล่างนี้เป็นตัวอย่างต่างๆ ของการสร้างชุดข้อมูล Beam ทั้งบนคลาวด์หรือในเครื่อง
บน Google Cloud Dataflow
หากต้องการรันไปป์ไลน์โดยใช้ Google Cloud Dataflow และใช้ประโยชน์จากการคำนวณแบบกระจาย ขั้นแรกให้ทำตาม คำแนะนำการเริ่มต้นอย่างรวดเร็ว
เมื่อตั้งค่าสภาพแวดล้อมแล้ว คุณสามารถเรียกใช้ tfds build
CLI ได้โดยใช้ไดเรกทอรีข้อมูลบน GCS และระบุ ตัวเลือกที่จำเป็น สำหรับแฟล็ก --beam_pipeline_options
เพื่อให้เรียกใช้สคริปต์ได้ง่ายขึ้น คุณควรกำหนดตัวแปรต่อไปนี้โดยใช้ค่าจริงสำหรับการตั้งค่า GCP/GCS และชุดข้อมูลที่คุณต้องการสร้าง
DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
GCP_PROJECT=my-project-id
GCS_BUCKET=gs://my-gcs-bucket
จากนั้น คุณจะต้องสร้างไฟล์เพื่อบอกให้ Dataflow ติดตั้ง tfds
ให้กับผู้ปฏิบัติงาน:
echo "tensorflow_datasets[$DATASET_NAME]" > /tmp/beam_requirements.txt
หากคุณใช้ tfds-nightly
ตรวจสอบให้แน่ใจว่าได้สะท้อนจาก tfds-nightly
ในกรณีที่ชุดข้อมูลได้รับการอัปเดตตั้งแต่รุ่นล่าสุด
echo "tfds-nightly[$DATASET_NAME]" > /tmp/beam_requirements.txt
หากคุณใช้การขึ้นต่อกันเพิ่มเติมที่ไม่รวมอยู่ในไลบรารี TFDS ให้ทำตาม คำแนะนำในการจัดการการขึ้นต่อกันของไปป์ไลน์ Python
สุดท้าย คุณสามารถเริ่มงานได้โดยใช้คำสั่งด้านล่าง:
tfds build $DATASET_NAME/$DATASET_CONFIG \
--data_dir=$GCS_BUCKET/tensorflow_datasets \
--beam_pipeline_options=\
"runner=DataflowRunner,project=$GCP_PROJECT,job_name=$DATASET_NAME-gen,"\
"staging_location=$GCS_BUCKET/binaries,temp_location=$GCS_BUCKET/temp,"\
"requirements_file=/tmp/beam_requirements.txt"
ในท้องถิ่น
หากต้องการรันสคริปต์ของคุณภายในเครื่องโดยใช้ Apache Beam runner เริ่มต้น (ต้องพอดีกับข้อมูลทั้งหมดในหน่วยความจำ) คำสั่งจะเหมือนกับชุดข้อมูลอื่นๆ:
tfds build my_dataset
ด้วย Apache Flink
หากต้องการรันไปป์ไลน์โดยใช้ Apache Flink คุณสามารถอ่าน เอกสารอย่างเป็นทางการได้ ตรวจสอบให้แน่ใจว่า Beam ของคุณสอดคล้องกับ ความเข้ากันได้ของเวอร์ชัน Flink
เพื่อให้เปิดสคริปต์ได้ง่ายขึ้น คุณควรกำหนดตัวแปรต่อไปนี้โดยใช้ค่าจริงสำหรับการตั้งค่า Flink และชุดข้อมูลที่คุณต้องการสร้าง:
DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
FLINK_CONFIG_DIR=<flink-config-directory>
FLINK_VERSION=<flink-version>
หากต้องการทำงานบนคลัสเตอร์ Flink แบบฝัง คุณสามารถเริ่มงานได้โดยใช้คำสั่งด้านล่าง:
tfds build $DATASET_NAME/$DATASET_CONFIG \
--beam_pipeline_options=\
"runner=FlinkRunner,flink_version=$FLINK_VERSION,flink_conf_dir=$FLINK_CONFIG_DIR"
ด้วยสคริปต์ที่กำหนดเอง
ในการสร้างชุดข้อมูลบน Beam นั้น API จะเหมือนกับชุดข้อมูลอื่นๆ คุณสามารถปรับแต่ง beam.Pipeline
ได้โดยใช้อาร์กิวเมนต์ beam_options
(และ beam_runner
) ของ DownloadConfig
# If you are running on Dataflow, Spark,..., you may have to set-up runtime
# flags. Otherwise, you can leave flags empty [].
flags = ['--runner=DataflowRunner', '--project=<project-name>', ...]
# `beam_options` (and `beam_runner`) will be forwarded to `beam.Pipeline`
dl_config = tfds.download.DownloadConfig(
beam_options=beam.options.pipeline_options.PipelineOptions(flags=flags)
)
data_dir = 'gs://my-gcs-bucket/tensorflow_datasets'
builder = tfds.builder('wikipedia/20190301.en', data_dir=data_dir)
builder.download_and_prepare(download_config=dl_config)
การใช้ชุดข้อมูล Beam
ข้อกำหนดเบื้องต้น
ในการเขียนชุดข้อมูล Apache Beam คุณควรคุ้นเคยกับแนวคิดต่อไปนี้:
- ทำความคุ้นเคยกับ คู่มือการสร้างชุดข้อมูล
tfds
เนื่องจากเนื้อหาส่วนใหญ่ยังคงใช้กับชุดข้อมูล Beam - รับข้อมูลเบื้องต้นเกี่ยวกับ Apache Beam พร้อม คู่มือการเขียนโปรแกรม Beam
- หากคุณต้องการสร้างชุดข้อมูลโดยใช้ Cloud Dataflow โปรดอ่าน เอกสาร Google Cloud และ คู่มือการพึ่งพา Apache Beam
คำแนะนำ
หากคุณคุ้นเคยกับ คู่มือการสร้างชุดข้อมูล การเพิ่มชุดข้อมูล Beam จะต้องแก้ไขฟังก์ชัน _generate_examples
เท่านั้น ฟังก์ชั่นควรส่งคืนวัตถุลำแสงแทนที่จะเป็นตัวสร้าง:
ชุดข้อมูลที่ไม่ใช่ลำแสง:
def _generate_examples(self, path):
for f in path.iterdir():
yield _process_example(f)
ชุดข้อมูลบีม:
def _generate_examples(self, path):
return (
beam.Create(path.iterdir())
| beam.Map(_process_example)
)
ส่วนที่เหลือทั้งหมดสามารถเหมือนกันได้ 100% รวมถึงการทดสอบด้วย
ข้อควรพิจารณาเพิ่มเติมบางประการ:
- ใช้
tfds.core.lazy_imports
เพื่อนำเข้า Apache Beam เมื่อใช้การพึ่งพาแบบ Lazy ผู้ใช้ยังสามารถอ่านชุดข้อมูลได้หลังจากที่ถูกสร้างขึ้นโดยไม่ต้องติดตั้ง Beam - ระวังการปิด Python เมื่อรันไปป์ไลน์ ฟังก์ชัน
beam.Map
และbeam.DoFn
จะถูกทำให้เป็นอนุกรมโดยใช้pickle
และส่งไปยังผู้ปฏิบัติงานทุกคน อย่าใช้วัตถุที่ไม่แน่นอนภายในbeam.PTransform
หากต้องมีการแบ่งปันสถานะระหว่างคนงาน - เนื่องจากวิธีที่
tfds.core.DatasetBuilder
ถูกทำให้เป็นอนุกรมด้วย Pickle การกลายพันธุ์tfds.core.DatasetBuilder
ในระหว่างการสร้างข้อมูลจะถูกละเว้นจากผู้ปฏิบัติงาน (เช่น ไม่สามารถตั้งค่าself.info.metadata['offset'] = 123
ใน_split_generators
และเข้าถึงได้จากคนงานเช่นbeam.Map(lambda x: x + self.info.metadata['offset'])
- หากคุณต้องการแชร์ขั้นตอนไปป์ไลน์ระหว่างการแยก คุณสามารถเพิ่ม
pipeline: beam.Pipeline
kwarg ถึง_split_generator
และควบคุมไปป์ไลน์การสร้างแบบเต็ม ดูเอกสาร_generate_examples
ของtfds.core.GeneratorBasedBuilder
ตัวอย่าง
นี่คือตัวอย่างของชุดข้อมูล Beam
class DummyBeamDataset(tfds.core.GeneratorBasedBuilder):
VERSION = tfds.core.Version('1.0.0')
def _info(self):
return self.dataset_info_from_configs(
features=tfds.features.FeaturesDict({
'image': tfds.features.Image(shape=(16, 16, 1)),
'label': tfds.features.ClassLabel(names=['dog', 'cat']),
}),
)
def _split_generators(self, dl_manager):
...
return {
'train': self._generate_examples(file_dir='path/to/train_data/'),
'test': self._generate_examples(file_dir='path/to/test_data/'),
}
def _generate_examples(self, file_dir: str):
"""Generate examples as dicts."""
beam = tfds.core.lazy_imports.apache_beam
def _process_example(filename):
# Use filename as key
return filename, {
'image': os.path.join(file_dir, filename),
'label': filename.split('.')[1], # Extract label: "0010102.dog.jpeg"
}
return (
beam.Create(tf.io.gfile.listdir(file_dir))
| beam.Map(_process_example)
)
ดำเนินการไปป์ไลน์ของคุณ
หากต้องการรันไปป์ไลน์ โปรดดูที่ส่วนด้านบน
tfds build my_dataset --register_checksums
ไปป์ไลน์โดยใช้ TFDS เป็นอินพุต
หากคุณต้องการสร้างบีมไปป์ไลน์ซึ่งใช้ชุดข้อมูล TFDS เป็นแหล่งที่มา คุณสามารถใช้ tfds.beam.ReadFromTFDS
:
builder = tfds.builder('my_dataset')
_ = (
pipeline
| tfds.beam.ReadFromTFDS(builder, split='train')
| beam.Map(tfds.as_numpy)
| ...
)
โดยจะประมวลผลแต่ละส่วนของชุดข้อมูลแบบขนาน