Menghasilkan kumpulan data besar dengan Apache Beam

Beberapa kumpulan data terlalu besar untuk diproses pada satu mesin. tfds mendukung pembuatan data di banyak mesin dengan menggunakan Apache Beam .

Dokumen ini memiliki dua bagian:

  • Untuk pengguna yang ingin membuat dataset Beam yang sudah ada
  • Bagi developer yang ingin membuat dataset Beam baru

Menghasilkan kumpulan data Beam

Di bawah ini adalah contoh berbeda dalam menghasilkan kumpulan data Beam, baik di cloud maupun secara lokal.

Di Aliran Data Google Cloud

Untuk menjalankan pipeline menggunakan Google Cloud Dataflow dan memanfaatkan komputasi terdistribusi, ikuti petunjuk Mulai Cepat terlebih dahulu.

Setelah lingkungan Anda disiapkan, Anda dapat menjalankan CLI tfds build menggunakan direktori data di GCS dan menentukan opsi yang diperlukan untuk flag --beam_pipeline_options .

Untuk mempermudah peluncuran skrip, sebaiknya tentukan variabel berikut menggunakan nilai sebenarnya untuk penyiapan GCP/GCS dan set data yang ingin Anda buat:

DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
GCP_PROJECT=my-project-id
GCS_BUCKET=gs://my-gcs-bucket

Anda kemudian perlu membuat file untuk memberi tahu Dataflow agar menginstal tfds pada pekerja:

echo "tensorflow_datasets[$DATASET_NAME]" > /tmp/beam_requirements.txt

Jika Anda menggunakan tfds-nightly , pastikan untuk melakukan echo dari tfds-nightly jika kumpulan data telah diperbarui sejak rilis terakhir.

echo "tfds-nightly[$DATASET_NAME]" > /tmp/beam_requirements.txt

Jika Anda menggunakan dependensi tambahan yang tidak disertakan dalam pustaka TFDS, ikuti petunjuk untuk mengelola dependensi pipa Python .

Terakhir, Anda dapat meluncurkan pekerjaan menggunakan perintah di bawah ini:

tfds build $DATASET_NAME/$DATASET_CONFIG \
  --data_dir=$GCS_BUCKET/tensorflow_datasets \
  --beam_pipeline_options=\
"runner=DataflowRunner,project=$GCP_PROJECT,job_name=$DATASET_NAME-gen,"\
"staging_location=$GCS_BUCKET/binaries,temp_location=$GCS_BUCKET/temp,"\
"requirements_file=/tmp/beam_requirements.txt"

Secara lokal

Untuk menjalankan skrip Anda secara lokal menggunakan runner Apache Beam default (harus memuat semua data di memori), perintahnya sama seperti untuk kumpulan data lainnya:

tfds build my_dataset

Untuk menjalankan pipeline menggunakan Apache Flink Anda dapat membaca dokumentasi resminya . Pastikan Beam Anda mematuhi Kompatibilitas Versi Flink

Untuk mempermudah peluncuran skrip, sebaiknya tentukan variabel berikut menggunakan nilai sebenarnya untuk penyiapan Flink dan himpunan data yang ingin Anda hasilkan:

DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
FLINK_CONFIG_DIR=<flink-config-directory>
FLINK_VERSION=<flink-version>

Untuk berjalan di klaster Flink tertanam, Anda dapat meluncurkan pekerjaan menggunakan perintah di bawah ini:

tfds build $DATASET_NAME/$DATASET_CONFIG \
  --beam_pipeline_options=\
"runner=FlinkRunner,flink_version=$FLINK_VERSION,flink_conf_dir=$FLINK_CONFIG_DIR"

Dengan skrip khusus

Untuk menghasilkan dataset di Beam, API-nya sama dengan dataset lainnya. Anda dapat menyesuaikan beam.Pipeline menggunakan argumen beam_options (dan beam_runner ) dari DownloadConfig .

# If you are running on Dataflow, Spark,..., you may have to set-up runtime
# flags. Otherwise, you can leave flags empty [].
flags = ['--runner=DataflowRunner', '--project=<project-name>', ...]

# `beam_options` (and `beam_runner`) will be forwarded to `beam.Pipeline`
dl_config = tfds.download.DownloadConfig(
    beam_options=beam.options.pipeline_options.PipelineOptions(flags=flags)
)
data_dir = 'gs://my-gcs-bucket/tensorflow_datasets'
builder = tfds.builder('wikipedia/20190301.en', data_dir=data_dir)
builder.download_and_prepare(download_config=dl_config)

Menerapkan kumpulan data Beam

Prasyarat

Untuk menulis kumpulan data Apache Beam, Anda harus memahami konsep berikut:

instruksi

Jika Anda sudah familiar dengan panduan pembuatan dataset , menambahkan dataset Beam hanya perlu memodifikasi fungsi _generate_examples . Fungsi tersebut harus mengembalikan objek beam, bukan generator:

Kumpulan data non-balok:

def _generate_examples(self, path):
  for f in path.iterdir():
    yield _process_example(f)

Kumpulan data balok:

def _generate_examples(self, path):
  return (
      beam.Create(path.iterdir())
      | beam.Map(_process_example)
  )

Semua yang lain bisa 100% identik, termasuk tes.

Beberapa pertimbangan tambahan:

  • Gunakan tfds.core.lazy_imports untuk mengimpor Apache Beam. Dengan menggunakan ketergantungan malas, pengguna tetap dapat membaca dataset setelah dibuat tanpa harus menginstal Beam.
  • Hati-hati dengan penutupan Python. Saat menjalankan pipeline, fungsi beam.Map dan beam.DoFn diserialkan menggunakan pickle dan dikirim ke semua pekerja. Jangan gunakan objek yang bisa diubah di dalam beam.PTransform jika statusnya harus dibagikan ke seluruh pekerja.
  • Karena cara tfds.core.DatasetBuilder diserialkan dengan acar, mutasi tfds.core.DatasetBuilder selama pembuatan data akan diabaikan pada pekerja (misalnya, tidak mungkin menyetel self.info.metadata['offset'] = 123 di _split_generators dan mengaksesnya dari pekerja seperti beam.Map(lambda x: x + self.info.metadata['offset'])
  • Jika Anda perlu berbagi beberapa langkah pipeline di antara pemisahan, Anda dapat menambahkan pipeline: beam.Pipeline kwarg ke _split_generator dan mengontrol pipeline generasi penuh. Lihat dokumentasi _generate_examples dari tfds.core.GeneratorBasedBuilder .

Contoh

Berikut adalah contoh kumpulan data Beam.

class DummyBeamDataset(tfds.core.GeneratorBasedBuilder):

  VERSION = tfds.core.Version('1.0.0')

  def _info(self):
    return self.dataset_info_from_configs(
        features=tfds.features.FeaturesDict({
            'image': tfds.features.Image(shape=(16, 16, 1)),
            'label': tfds.features.ClassLabel(names=['dog', 'cat']),
        }),
    )

  def _split_generators(self, dl_manager):
    ...
    return {
        'train': self._generate_examples(file_dir='path/to/train_data/'),
        'test': self._generate_examples(file_dir='path/to/test_data/'),
    }

  def _generate_examples(self, file_dir: str):
    """Generate examples as dicts."""
    beam = tfds.core.lazy_imports.apache_beam

    def _process_example(filename):
      # Use filename as key
      return filename, {
          'image': os.path.join(file_dir, filename),
          'label': filename.split('.')[1],  # Extract label: "0010102.dog.jpeg"
      }

    return (
        beam.Create(tf.io.gfile.listdir(file_dir))
        | beam.Map(_process_example)
    )

Menjalankan saluran pipa Anda

Untuk menjalankan pipeline, lihat bagian di atas.

tfds build my_dataset --register_checksums

Pipeline menggunakan TFDS sebagai input

Jika Anda ingin membuat beam pipeline yang menggunakan kumpulan data TFDS sebagai sumber, Anda dapat menggunakan tfds.beam.ReadFromTFDS :

builder = tfds.builder('my_dataset')

_ = (
    pipeline
    | tfds.beam.ReadFromTFDS(builder, split='train')
    | beam.Map(tfds.as_numpy)
    | ...
)

Ini akan memproses setiap pecahan kumpulan data secara paralel.