Эта страница была переведа с помощью Cloud Translation API.
Switch to English

Создание больших наборов данных с помощью Apache Beam

Некоторые наборы данных слишком велики для обработки на одном компьютере. tfds поддерживает создание данных на многих машинах с помощью Apache Beam .

Этот документ состоит из двух разделов:

  • Для пользователей, которые хотят создать существующий набор данных Beam
  • Для разработчиков, которые хотят создать новый набор данных Beam

Создание набора данных Beam

Ниже приведены различные примеры создания набора данных Beam как в облаке, так и локально.

В Google Cloud Dataflow

Чтобы запустить конвейер с помощью Google Cloud Dataflow и воспользоваться преимуществами распределенных вычислений, сначала следуйте инструкциям по быстрому запуску .

После настройки среды вы можете запустить tfds build командной строки tfds build используя каталог данных на GCS и указав необходимые параметры для флага --beam_pipeline_options .

Чтобы упростить запуск скрипта, полезно определить следующие переменные, используя фактические значения для вашей настройки GCP / GCS и набора данных, который вы хотите сгенерировать:

DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
GCP_PROJECT=my-project-id
GCS_BUCKET=gs://my-gcs-bucket

Затем вам нужно будет создать файл, чтобы сообщить Dataflow об установке tfds на рабочих:

echo "tensorflow_datasets[$DATASET_NAME]" > /tmp/beam_requirements.txt

Если вы используете tfds-nightly , убедитесь, что это эхо от tfds-nightly на случай, если набор данных был обновлен с момента последней версии.

echo "tfds-nightly[$DATASET_NAME]" > /tmp/beam_requirements.txt

Наконец, вы можете запустить задание, используя следующую команду:

tfds build $DATASET_NAME/$DATASET_CONFIG \
  --data_dir=$GCS_BUCKET/tensorflow_datasets \
  --beam_pipeline_options=\
"runner=DataflowRunner,project=$GCP_PROJECT,job_name=$DATASET_NAME-gen,"\
"staging_location=$GCS_BUCKET/binaries,temp_location=$GCS_BUCKET/temp,"\
"requirements_file=/tmp/beam_requirements.txt"

Локально

Чтобы запустить ваш скрипт локально, используя средство запуска Apache Beam по умолчанию, команда такая же, как и для других наборов данных:

tfds build my_dataset

С кастомным скриптом

Для создания набора данных в Beam API такой же, как и для других наборов данных. Вы можете настроить beam.Pipeline используя beam_optionsbeam_runner ) DownloadConfig .

# If you are running on Dataflow, Spark,..., you may have to set-up runtime
# flags. Otherwise, you can leave flags empty [].
flags = ['--runner=DataflowRunner', '--project=<project-name>', ...]

# `beam_options` (and `beam_runner`) will be forwarded to `beam.Pipeline`
dl_config = tfds.download.DownloadConfig(
    beam_options=beam.options.pipeline_options.PipelineOptions(flags=flags)
)
data_dir = 'gs://my-gcs-bucket/tensorflow_datasets'
builder = tfds.builder('wikipedia/20190301.en', data_dir=data_dir)
builder.download_and_prepare(download_config=dl_config)

Реализация набора данных Beam

Предпосылки

Чтобы писать наборы данных Apache Beam, вы должны быть знакомы со следующими концепциями:

инструкции

Если вы знакомы с руководством по созданию набора данных , добавление набора данных Beam требует только изменения функции _generate_examples . Функция должна возвращать объект луча, а не генератор:

Набор данных без луча:

def _generate_examples(self, path):
  for f in path.iterdir():
    yield _process_example(f)

Набор данных балки:

def _generate_examples(self, path):
  return (
      beam.Create(path.iterdir())
      | beam.Map(_process_example)
  )

Все остальное может быть на 100% идентичным, включая тесты.

Некоторые дополнительные соображения:

  • Используйте tfds.core.lazy_imports для импорта Apache Beam. Используя ленивую зависимость, пользователи могут читать набор данных после того, как он был сгенерирован, без необходимости установки Beam.
  • Будьте осторожны с закрытием Python. При запуске конвейера функции beam.Map и beam.DoFn сериализуются с помощью pickle и отправляются всем рабочим. Не beam.PTransform объекты внутри beam.PTransform если состояние должно быть разделено между рабочими.
  • Из-за способа сериализации tfds.core.DatasetBuilder с помощью pickle изменение tfds.core.DatasetBuilder во время создания данных будет игнорироваться рабочими процессами (например, невозможно установить self.info.metadata['offset'] = 123 в _split_generators и получить к нему доступ из рабочих, таких как beam.Map(lambda x: x + self.info.metadata['offset']) )
  • Если вам нужно разделить некоторые этапы конвейера между разделениями, вы можете добавить дополнительный pipeline: beam.Pipeline kwarg в _split_generator и контролировать весь конвейер генерации. См _generate_examples документации tfds.core.GeneratorBasedBuilder .

пример

Вот пример набора данных Beam.

class DummyBeamDataset(tfds.core.GeneratorBasedBuilder):

  VERSION = tfds.core.Version('1.0.0')

  def _info(self):
    return tfds.core.DatasetInfo(
        builder=self,
        features=tfds.features.FeaturesDict({
            'image': tfds.features.Image(shape=(16, 16, 1)),
            'label': tfds.features.ClassLabel(names=['dog', 'cat']),
        }),
    )

  def _split_generators(self, dl_manager):
    ...
    return {
        'train': self._generate_examples(file_dir='path/to/train_data/'),
        'test': self._generate_examples(file_dir='path/to/test_data/'),
    }

  def _generate_examples(self, file_dir: str):
    """Generate examples as dicts."""
    beam = tfds.core.lazy_imports.apache_beam

    def _process_example(filename):
      # Use filename as key
      return filename, {
          'image': os.path.join(file_dir, filename),
          'label': filename.split('.')[1],  # Extract label: "0010102.dog.jpeg"
      }

    return (
        beam.Create(tf.io.gfile.listdir(file_dir))
        | beam.Map(_process_example)
    )

Запуск вашего конвейера

Чтобы запустить конвейер, взгляните на приведенный выше раздел.

tfds build my_dataset --register_checksums