Ta strona została przetłumaczona przez Cloud Translation API.
Switch to English

Generowanie dużych zbiorów danych za pomocą Apache Beam

Niektóre zbiory danych są zbyt duże, aby można je było przetworzyć na jednym komputerze. tfds obsługuje generowanie danych na wielu maszynach przy użyciu Apache Beam .

Ten dokument ma dwie sekcje:

  • Dla użytkowników, którzy chcą wygenerować istniejący zestaw danych Beam
  • Dla programistów, którzy chcą stworzyć nowy zestaw danych Beam

Spis treści:

Generowanie zestawu danych Beam

Poniżej znajdują się różne przykłady generowania zestawu danych Beam, zarówno w chmurze, jak i lokalnie.

W Google Cloud Dataflow

Aby uruchomić potok za pomocą Google Cloud Dataflow i skorzystać z obliczeń rozproszonych, najpierw wykonaj instrukcje szybkiego startu.

Po skonfigurowaniu środowiska możesz uruchomić skrypt download_and_prepare przy użyciu katalogu danych w GCS i określając wymagane opcje dla flagi --beam_pipeline_options .

Aby ułatwić uruchamianie skryptu, warto zdefiniować następujące zmienne na podstawie rzeczywistych wartości konfiguracji GCP / GCS i zbioru danych, który chcesz wygenerować:

 DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
GCP_PROJECT=my-project-id
GCS_BUCKET=gs://my-gcs-bucket
 

Będziesz wtedy musiał utworzyć plik, aby poinformować Dataflow o zainstalowaniu tfds na pracownikach:

 echo "tensorflow_datasets[$DATASET_NAME]" > /tmp/beam_requirements.txt
 

Jeśli używasz tfds-nightly , pamiętaj, aby wyświetlić echo z tfds-nightly w przypadku, gdy zestaw danych został zaktualizowany od ostatniego wydania.

 echo "tfds-nightly[$DATASET_NAME]" > /tmp/beam_requirements.txt
 

Na koniec możesz uruchomić zadanie za pomocą poniższego polecenia:

 python -m tensorflow_datasets.scripts.download_and_prepare \
  --datasets=$DATASET_NAME/$DATASET_CONFIG \
  --data_dir=$GCS_BUCKET/tensorflow_datasets \
  --beam_pipeline_options=\
"runner=DataflowRunner,project=$GCP_PROJECT,job_name=$DATASET_NAME-gen,"\
"staging_location=$GCS_BUCKET/binaries,temp_location=$GCS_BUCKET/temp,"\
"requirements_file=/tmp/beam_requirements.txt"
 

Lokalnie

Aby uruchomić skrypt lokalnie przy użyciu domyślnego programu uruchamiającego Apache Beam, polecenie jest takie samo, jak w przypadku innych zestawów danych:

 python -m tensorflow_datasets.scripts.download_and_prepare \
  --datasets=my_new_dataset
 

Z niestandardowym skryptem

Aby wygenerować zestaw danych w Beam, interfejs API jest taki sam jak w przypadku innych zestawów danych, ale musisz przekazać opcje Beam lub element prowadzący do DownloadConfig .

 # If you are running on Dataflow, Spark,..., you may have to set-up runtime
# flags. Otherwise, you can leave flags empty [].
flags = ['--runner=DataflowRunner', '--project=<project-name>', ...]

# To use Beam, you have to set at least one of `beam_options` or `beam_runner`
dl_config = tfds.download.DownloadConfig(
    beam_options=beam.options.pipeline_options.PipelineOptions(flags=flags)
)

data_dir = 'gs://my-gcs-bucket/tensorflow_datasets'
builder = tfds.builder('wikipedia/20190301.en', data_dir=data_dir)
builder.download_and_prepare(
    download_dir=FLAGS.download_dir,
    download_config=dl_config,
)
 

Implementacja zestawu danych Beam

Wymagania wstępne

Aby pisać zestawy danych Apache Beam, należy zapoznać się z następującymi pojęciami:

Instrukcje

Jeśli znasz przewodnik po tworzeniu zestawu danych , dodanie zestawu danych belki wymaga tylko kilku modyfikacji:

  • Twój DatasetBuilder będzie dziedziczyć po tfds.core.BeamBasedBuilder zamiast tfds.core.GeneratorBasedBuilder .
  • _build_pcollection(self, **kwargs) danych belek powinny implementować metodę abstrakcyjną _build_pcollection(self, **kwargs) zamiast metody _generate_examples(self, **kwargs) . _build_pcollection powinien zwrócić beam.PCollection z przykładami skojarzonymi z podziałem.
  • Pisanie testu jednostkowego dla zestawu danych Beam przebiega tak samo, jak w przypadku innych zestawów danych.

Dodatkowe uwagi:

  • Użyj tfds.core.lazy_imports aby zaimportować Apache Beam. Korzystając z leniwej zależności, użytkownicy mogą nadal odczytywać zestaw danych po jego wygenerowaniu bez konieczności instalowania Beam.
  • Uważaj na domknięcia w Pythonie. Podczas uruchamiania rurociągu funkcje beam.Map i beam.DoFn są serializowane za pomocą pickle i wysyłane do wszystkich pracowników. Może to powodować błędy; na przykład, jeśli używasz mutowalnego obiektu w swoich funkcjach, który został zadeklarowany poza funkcją, możesz napotkać błędy pickle lub nieoczekiwane zachowanie. Rozwiązaniem jest zazwyczaj uniknięcie mutacji zamkniętych obiektów.
  • Używanie metod w DatasetBuilder w potoku Beam jest w porządku. Jednak sposób, w jaki klasa jest serializowana podczas wytrawiania, zmiany wprowadzone w funkcjach podczas tworzenia będą w najlepszym przypadku ignorowane.

Przykład

Oto przykład zestawu danych Beam. Aby uzyskać bardziej skomplikowany, rzeczywisty przykład, spójrz na zbiór danych Wikipedia .

 class DummyBeamDataset(tfds.core.BeamBasedBuilder):

  VERSION = tfds.core.Version('1.0.0')

  def _info(self):
    return tfds.core.DatasetInfo(
        builder=self,
        features=tfds.features.FeaturesDict({
            'image': tfds.features.Image(shape=(16, 16, 1)),
            'label': tfds.features.ClassLabel(names=['dog', 'cat']),
        }),
    )

  def _split_generators(self, dl_manager):
    ...
    return [
        tfds.core.SplitGenerator(
            name=tfds.Split.TRAIN,
            gen_kwargs=dict(file_dir='path/to/train_data/'),
        ),
        splits_lib.SplitGenerator(
            name=tfds.Split.TEST,
            gen_kwargs=dict(file_dir='path/to/test_data/'),
        ),
    ]

  def _build_pcollection(self, pipeline, file_dir):
    """Generate examples as dicts."""
    beam = tfds.core.lazy_imports.apache_beam

    def _process_example(filename):
      # Use filename as key
      return filename, {
          'image': os.path.join(file_dir, filename),
          'label': filename.split('.')[1],  # Extract label: "0010102.dog.jpeg"
      }

    return (
        pipeline
        | beam.Create(tf.io.gfile.listdir(file_dir))
        | beam.Map(_process_example)
    )

 

Uruchomienie rurociągu

Aby uruchomić potok, spójrz na powyższą sekcję.

 python -m tensorflow_datasets.scripts.download_and_prepare \
  --register_checksums \
  --datasets=my_new_dataset