Trang này được dịch bởi Cloud Translation API.
Switch to English

Tạo tập dữ liệu lớn với Apache Beam

Một số bộ dữ liệu quá lớn để được xử lý trên một máy. tfds hỗ trợ tạo dữ liệu trên nhiều máy bằng cách sử dụng Apache Beam .

Tài liệu này có hai phần:

  • Đối với người dùng muốn tạo tập dữ liệu Beam hiện có
  • Dành cho các nhà phát triển muốn tạo tập dữ liệu Beam mới

Tạo tập dữ liệu Beam

Dưới đây là các ví dụ khác nhau về cách tạo tập dữ liệu Beam, cả trên đám mây hoặc cục bộ.

Trên Google Cloud Dataflow

Để chạy quy trình bằng Google Cloud Dataflow và tận dụng tính toán phân tán, trước tiên hãy làm theo hướng dẫn Bắt đầu nhanh .

Khi môi trường của bạn được thiết lập, bạn có thể chạy CLI bản tfds build bằng cách sử dụng thư mục dữ liệu trên GCS và chỉ định các tùy chọn cần thiết cho cờ --beam_pipeline_options .

Để khởi chạy tập lệnh dễ dàng hơn, bạn nên xác định các biến sau bằng cách sử dụng các giá trị thực tế cho thiết lập GCP / GCS và tập dữ liệu bạn muốn tạo:

DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
GCP_PROJECT=my-project-id
GCS_BUCKET=gs://my-gcs-bucket

Sau đó, bạn sẽ cần tạo một tệp để yêu cầu Dataflow cài đặt tfds trên tfds :

echo "tensorflow_datasets[$DATASET_NAME]" > /tmp/beam_requirements.txt

Nếu bạn đang sử dụng tfds-nightly , hãy đảm bảo gửi lại từ tfds-nightly trong trường hợp tập dữ liệu đã được cập nhật kể từ lần phát hành cuối cùng.

echo "tfds-nightly[$DATASET_NAME]" > /tmp/beam_requirements.txt

Cuối cùng, bạn có thể khởi chạy công việc bằng lệnh dưới đây:

tfds build $DATASET_NAME/$DATASET_CONFIG \
  --data_dir=$GCS_BUCKET/tensorflow_datasets \
  --beam_pipeline_options=\
"runner=DataflowRunner,project=$GCP_PROJECT,job_name=$DATASET_NAME-gen,"\
"staging_location=$GCS_BUCKET/binaries,temp_location=$GCS_BUCKET/temp,"\
"requirements_file=/tmp/beam_requirements.txt"

Cục bộ

Để chạy tập lệnh của bạn cục bộ bằng trình chạy Apache Beam mặc định, lệnh này cũng giống như đối với các tập dữ liệu khác:

tfds build my_dataset

Với một tập lệnh tùy chỉnh

Để tạo tập dữ liệu trên Beam, API cũng giống như các tập dữ liệu khác. Bạn có thể tùy chỉnh beam.Pipeline bằng cách sử dụng các đối số beam_options (và beam_runner ) của DownloadConfig .

# If you are running on Dataflow, Spark,..., you may have to set-up runtime
# flags. Otherwise, you can leave flags empty [].
flags = ['--runner=DataflowRunner', '--project=<project-name>', ...]

# `beam_options` (and `beam_runner`) will be forwarded to `beam.Pipeline`
dl_config = tfds.download.DownloadConfig(
    beam_options=beam.options.pipeline_options.PipelineOptions(flags=flags)
)
data_dir = 'gs://my-gcs-bucket/tensorflow_datasets'
builder = tfds.builder('wikipedia/20190301.en', data_dir=data_dir)
builder.download_and_prepare(download_config=dl_config)

Triển khai tập dữ liệu Beam

Điều kiện tiên quyết

Để viết tập dữ liệu Apache Beam, bạn nên nắm rõ các khái niệm sau:

Hướng dẫn

Nếu bạn đã quen với hướng dẫn tạo tập dữ liệu , thì việc thêm tập dữ liệu Beam chỉ yêu cầu sửa đổi hàm _generate_examples . Hàm sẽ trả về một đối tượng chùm chứ không phải một trình tạo:

Bộ dữ liệu không chùm:

def _generate_examples(self, path):
  for f in path.iterdir():
    yield _process_example(f)

Bộ dữ liệu chùm:

def _generate_examples(self, path):
  return (
      beam.Create(path.iterdir())
      | beam.Map(_process_example)
  )

Tất cả phần còn lại có thể giống nhau 100%, bao gồm cả các bài kiểm tra.

Một số cân nhắc bổ sung:

  • Sử dụng tfds.core.lazy_imports để nhập Apache Beam. Bằng cách sử dụng sự phụ thuộc lười biếng, người dùng vẫn có thể đọc tập dữ liệu sau khi nó được tạo mà không cần phải cài đặt Beam.
  • Hãy cẩn thận với việc đóng Python. Khi chạy các đường ống dẫn, các beam.Mapbeam.DoFn chức năng được đăng bằng pickle và gửi đến tất cả người lao động. Không thay đổi các đối tượng bên trong một beam.PTransform nếu trạng thái phải được chia sẻ giữa các beam.PTransform .
  • Do cách tfds.core.DatasetBuilder được tuần tự hóa với pickle, việc thay đổi tfds.core.DatasetBuilder trong quá trình tạo dữ liệu sẽ bị bỏ qua trên tfds.core.DatasetBuilder (ví dụ: không thể đặt self.info.metadata['offset'] = 123 trong _split_generators và truy cập nó từ các beam.Map(lambda x: x + self.info.metadata['offset']) như beam.Map(lambda x: x + self.info.metadata['offset']) )
  • Nếu bạn cần chia sẻ một số bước đường ống giữa các phần tách, bạn có thể thêm một pipeline: beam.Pipeline bổ sung pipeline: beam.Pipeline kwarg vào _split_generator và kiểm soát đường ống thế hệ đầy đủ. Xem _generate_examples tài liệu của tfds.core.GeneratorBasedBuilder .

Thí dụ

Đây là một ví dụ về tập dữ liệu Beam.

class DummyBeamDataset(tfds.core.GeneratorBasedBuilder):

  VERSION = tfds.core.Version('1.0.0')

  def _info(self):
    return tfds.core.DatasetInfo(
        builder=self,
        features=tfds.features.FeaturesDict({
            'image': tfds.features.Image(shape=(16, 16, 1)),
            'label': tfds.features.ClassLabel(names=['dog', 'cat']),
        }),
    )

  def _split_generators(self, dl_manager):
    ...
    return {
        'train': self._generate_examples(file_dir='path/to/train_data/'),
        'test': self._generate_examples(file_dir='path/to/test_data/'),
    }

  def _generate_examples(self, file_dir: str):
    """Generate examples as dicts."""
    beam = tfds.core.lazy_imports.apache_beam

    def _process_example(filename):
      # Use filename as key
      return filename, {
          'image': os.path.join(file_dir, filename),
          'label': filename.split('.')[1],  # Extract label: "0010102.dog.jpeg"
      }

    return (
        beam.Create(tf.io.gfile.listdir(file_dir))
        | beam.Map(_process_example)
    )

Chạy đường ống của bạn

Để chạy đường ống, hãy xem phần trên.

tfds build my_dataset --register_checksums