Генерация больших наборов данных с помощью Apache Beam

Некоторые наборы данных слишком велики для обработки на одном компьютере. tfds поддерживает генерацию данных на многих машинах с помощью Apache Beam .

Этот документ состоит из двух разделов:

  • Для пользователей, которые хотят создать существующий набор данных Beam
  • Для разработчиков, которые хотят создать новый набор данных Beam.

Создание набора данных Beam

Ниже приведены различные примеры создания набора данных Beam как в облаке, так и локально.

В потоке данных Google Cloud

Чтобы запустить конвейер с помощью Google Cloud Dataflow и воспользоваться преимуществами распределенных вычислений, сначала следуйте инструкциям по быстрому запуску .

После настройки среды вы можете запустить CLI tfds build , используя каталог данных в GCS и указав необходимые параметры для флага --beam_pipeline_options .

Чтобы упростить запуск сценария, полезно определить следующие переменные, используя фактические значения для вашей настройки GCP/GCS и набора данных, который вы хотите сгенерировать:

DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
GCP_PROJECT=my-project-id
GCS_BUCKET=gs://my-gcs-bucket

Затем вам нужно будет создать файл, чтобы указать Dataflow установить tfds на рабочих:

echo "tensorflow_datasets[$DATASET_NAME]" > /tmp/beam_requirements.txt

Если вы используете tfds-nightly , обязательно используйте эхо-сигнал tfds-nightly , если набор данных был обновлен с момента последнего выпуска.

echo "tfds-nightly[$DATASET_NAME]" > /tmp/beam_requirements.txt

Если вы используете дополнительные зависимости, не включенные в библиотеку TFDS, следуйте инструкциям по управлению зависимостями конвейера Python .

Наконец, вы можете запустить задание, используя команду ниже:

tfds build $DATASET_NAME/$DATASET_CONFIG \
  --data_dir=$GCS_BUCKET/tensorflow_datasets \
  --beam_pipeline_options=\
"runner=DataflowRunner,project=$GCP_PROJECT,job_name=$DATASET_NAME-gen,"\
"staging_location=$GCS_BUCKET/binaries,temp_location=$GCS_BUCKET/temp,"\
"requirements_file=/tmp/beam_requirements.txt"

Локально

Чтобы запустить скрипт локально с помощью средства запуска Apache Beam по умолчанию (он должен уместить все данные в памяти), команда такая же, как и для других наборов данных:

tfds build my_dataset

Чтобы запустить конвейер с помощью Apache Flink, вы можете прочитать официальную документацию . Убедитесь, что ваш Beam соответствует совместимости версий Flink.

Чтобы упростить запуск сценария, полезно определить следующие переменные, используя фактические значения для вашей настройки Flink и набора данных, который вы хотите сгенерировать:

DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
FLINK_CONFIG_DIR=<flink-config-directory>
FLINK_VERSION=<flink-version>

Чтобы запустить задание во встроенном кластере Flink, вы можете запустить задание с помощью следующей команды:

tfds build $DATASET_NAME/$DATASET_CONFIG \
  --beam_pipeline_options=\
"runner=FlinkRunner,flink_version=$FLINK_VERSION,flink_conf_dir=$FLINK_CONFIG_DIR"

С пользовательским скриптом

Для создания набора данных в Beam используется тот же API, что и для других наборов данных. Вы можете настроить beam.Pipeline , используя аргументы beam_optionsbeam_runner ) DownloadConfig .

# If you are running on Dataflow, Spark,..., you may have to set-up runtime
# flags. Otherwise, you can leave flags empty [].
flags = ['--runner=DataflowRunner', '--project=<project-name>', ...]

# `beam_options` (and `beam_runner`) will be forwarded to `beam.Pipeline`
dl_config = tfds.download.DownloadConfig(
    beam_options=beam.options.pipeline_options.PipelineOptions(flags=flags)
)
data_dir = 'gs://my-gcs-bucket/tensorflow_datasets'
builder = tfds.builder('wikipedia/20190301.en', data_dir=data_dir)
builder.download_and_prepare(download_config=dl_config)

Реализация набора данных Beam

Предварительные условия

Чтобы писать наборы данных Apache Beam, вы должны быть знакомы со следующими понятиями:

инструкции

Если вы знакомы с руководством по созданию набора данных , для добавления набора данных Beam требуется всего лишь изменить функцию _generate_examples . Функция должна возвращать объект балки, а не генератор:

Нелучевой набор данных:

def _generate_examples(self, path):
  for f in path.iterdir():
    yield _process_example(f)

Набор данных луча:

def _generate_examples(self, path):
  return (
      beam.Create(path.iterdir())
      | beam.Map(_process_example)
  )

Все остальное может быть на 100% идентично, включая тесты.

Некоторые дополнительные соображения:

  • Используйте tfds.core.lazy_imports для импорта Apache Beam. Используя отложенную зависимость, пользователи по-прежнему могут читать набор данных после его создания без необходимости устанавливать Beam.
  • Будьте осторожны с замыканиями Python. При запуске конвейера функции beam.Map и beam.DoFn сериализуются с помощью pickle и отправляются всем воркерам. Не используйте изменяемые объекты внутри beam.PTransform , если состояние должно быть общим для всех рабочих.
  • Из-за того, что tfds.core.DatasetBuilder сериализуется с помощью Pickle, мутация tfds.core.DatasetBuilder во время создания данных будет игнорироваться в рабочих процессах (например, невозможно установить self.info.metadata['offset'] = 123 в _split_generators и получить к нему доступ от рабочих, например beam.Map(lambda x: x + self.info.metadata['offset']) )
  • Если вам нужно разделить некоторые этапы конвейера между разделениями, вы можете добавить дополнительный pipeline: beam.Pipeline kwarg в _split_generator и управлять полным конвейером генерации. См. документацию _generate_examples для tfds.core.GeneratorBasedBuilder .

Пример

Вот пример набора данных Beam.

class DummyBeamDataset(tfds.core.GeneratorBasedBuilder):

  VERSION = tfds.core.Version('1.0.0')

  def _info(self):
    return self.dataset_info_from_configs(
        features=tfds.features.FeaturesDict({
            'image': tfds.features.Image(shape=(16, 16, 1)),
            'label': tfds.features.ClassLabel(names=['dog', 'cat']),
        }),
    )

  def _split_generators(self, dl_manager):
    ...
    return {
        'train': self._generate_examples(file_dir='path/to/train_data/'),
        'test': self._generate_examples(file_dir='path/to/test_data/'),
    }

  def _generate_examples(self, file_dir: str):
    """Generate examples as dicts."""
    beam = tfds.core.lazy_imports.apache_beam

    def _process_example(filename):
      # Use filename as key
      return filename, {
          'image': os.path.join(file_dir, filename),
          'label': filename.split('.')[1],  # Extract label: "0010102.dog.jpeg"
      }

    return (
        beam.Create(tf.io.gfile.listdir(file_dir))
        | beam.Map(_process_example)
    )

Запуск вашего конвейера

Чтобы запустить конвейер, ознакомьтесь с приведенным выше разделом.

tfds build my_dataset --register_checksums

Конвейер, использующий TFDS в качестве входных данных

Если вы хотите создать лучевой конвейер, который принимает набор данных TFDS в качестве источника, вы можете использовать tfds.beam.ReadFromTFDS :

builder = tfds.builder('my_dataset')

_ = (
    pipeline
    | tfds.beam.ReadFromTFDS(builder, split='train')
    | beam.Map(tfds.as_numpy)
    | ...
)

Он будет обрабатывать каждый фрагмент набора данных параллельно.