Diese Seite wurde von der Cloud Translation API übersetzt.
Switch to English

Generieren großer Datenmengen mit Apache Beam

Einige Datensätze sind zu groß, um auf einem einzelnen Computer verarbeitet zu werden. tfds unterstützt das Generieren von Daten auf vielen Computern mithilfe von Apache Beam .

Dieses Dokument besteht aus zwei Abschnitten:

  • Für Benutzer, die ein vorhandenes Beam-Dataset generieren möchten
  • Für Entwickler, die einen neuen Beam-Datensatz erstellen möchten

Inhaltsverzeichnis:

Generieren eines Beam-Datasets

Im Folgenden finden Sie verschiedene Beispiele zum Generieren eines Beam-Datasets, sowohl in der Cloud als auch lokal.

In Google Cloud Dataflow

Befolgen Sie zunächst die Schnellstartanweisungen, um die Pipeline mit Google Cloud Dataflow auszuführen und die verteilte Berechnung zu nutzen.

Sobald Ihre Umgebung eingerichtet ist, können Sie das Skript download_and_prepare mithilfe eines Datenverzeichnisses in GCS --beam_pipeline_options und die erforderlichen Optionen für das Flag --beam_pipeline_options .

Um das Starten des Skripts zu vereinfachen, ist es hilfreich, die folgenden Variablen anhand der tatsächlichen Werte für Ihr GCP / GCS-Setup und des zu generierenden Datasets zu definieren:

 DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
GCP_PROJECT=my-project-id
GCS_BUCKET=gs://my-gcs-bucket
 

Anschließend müssen Sie eine Datei erstellen, um tfds auf den tfds zu installieren:

 echo "tensorflow_datasets[$DATASET_NAME]" > /tmp/beam_requirements.txt
 

Wenn Sie mit tfds-nightly , stellen Sie sicher von Echo tfds-nightly falls der Datensatz seit der letzten Version aktualisiert.

 echo "tfds-nightly[$DATASET_NAME]" > /tmp/beam_requirements.txt
 

Schließlich können Sie den Job mit dem folgenden Befehl starten:

 python -m tensorflow_datasets.scripts.download_and_prepare \
  --datasets=$DATASET_NAME/$DATASET_CONFIG \
  --data_dir=$GCS_BUCKET/tensorflow_datasets \
  --beam_pipeline_options=\
"runner=DataflowRunner,project=$GCP_PROJECT,job_name=$DATASET_NAME-gen,"\
"staging_location=$GCS_BUCKET/binaries,temp_location=$GCS_BUCKET/temp,"\
"requirements_file=/tmp/beam_requirements.txt"
 

Örtlich

Um Ihr Skript lokal mit dem Standard-Apache Beam-Runner auszuführen, ist der Befehl der gleiche wie für andere Datasets:

 python -m tensorflow_datasets.scripts.download_and_prepare \
  --datasets=my_new_dataset
 

Mit einem benutzerdefinierten Skript

Um das Dataset in Beam zu generieren, ist die API dieselbe wie für andere Datasets, Sie müssen jedoch die Beam-Optionen oder den Runner an die DownloadConfig .

 # If you are running on Dataflow, Spark,..., you may have to set-up runtime
# flags. Otherwise, you can leave flags empty [].
flags = ['--runner=DataflowRunner', '--project=<project-name>', ...]

# To use Beam, you have to set at least one of `beam_options` or `beam_runner`
dl_config = tfds.download.DownloadConfig(
    beam_options=beam.options.pipeline_options.PipelineOptions(flags=flags)
)

data_dir = 'gs://my-gcs-bucket/tensorflow_datasets'
builder = tfds.builder('wikipedia/20190301.en', data_dir=data_dir)
builder.download_and_prepare(
    download_dir=FLAGS.download_dir,
    download_config=dl_config,
)
 

Implementieren eines Beam-Datasets

Voraussetzungen

Um Apache Beam-Datasets zu schreiben, sollten Sie mit den folgenden Konzepten vertraut sein:

Anleitung

Wenn Sie mit dem Handbuch zur Dataset-Erstellung vertraut sind, müssen zum Hinzufügen eines Beam-Datasets nur einige Änderungen vorgenommen werden:

  • Ihr DatasetBuilder erbt von tfds.core.BeamBasedBuilder anstelle von tfds.core.GeneratorBasedBuilder .
  • Beam-Datasets sollten die abstrakte Methode _build_pcollection(self, **kwargs) anstelle der Methode _generate_examples(self, **kwargs) . _build_pcollection sollte einen beam.PCollection zurückgeben. beam.PCollection mit den Beispielen, die dem Split zugeordnet sind.
  • Das Schreiben eines Komponententests für Ihr Beam-Dataset ist dasselbe wie bei anderen Datasets.

Einige zusätzliche Überlegungen:

  • Verwenden Sie tfds.core.lazy_imports , um Apache Beam zu importieren. Durch die Verwendung einer verzögerten Abhängigkeit können Benutzer das Dataset nach seiner Generierung weiterhin lesen, ohne Beam installieren zu müssen.
  • Seien Sie vorsichtig mit Python-Verschlüssen. Beim Ausführen der Pipeline werden die Funktionen beam.Map und beam.DoFn mithilfe von pickle serialisiert und an alle Mitarbeiter gesendet. Dies kann zu Fehlern führen. Wenn Sie beispielsweise ein veränderbares Objekt in Ihren Funktionen verwenden, das außerhalb der Funktion deklariert wurde, können pickle Fehler oder unerwartetes Verhalten auftreten. Die Korrektur besteht normalerweise darin, zu vermeiden, dass geschlossene Objekte mutiert werden.
  • Die Verwendung von Methoden für DatasetBuilder in der Beam-Pipeline ist in Ordnung. Bei der Serialisierung der Klasse während des Pickles werden Änderungen an Features während der Erstellung jedoch bestenfalls ignoriert.

Beispiel

Hier ist ein Beispiel für einen Beam-Datensatz. Ein komplizierteres Beispiel finden Sie im Wikipedia Datensatz .

 class DummyBeamDataset(tfds.core.BeamBasedBuilder):

  VERSION = tfds.core.Version('1.0.0')

  def _info(self):
    return tfds.core.DatasetInfo(
        builder=self,
        features=tfds.features.FeaturesDict({
            'image': tfds.features.Image(shape=(16, 16, 1)),
            'label': tfds.features.ClassLabel(names=['dog', 'cat']),
        }),
    )

  def _split_generators(self, dl_manager):
    ...
    return [
        tfds.core.SplitGenerator(
            name=tfds.Split.TRAIN,
            gen_kwargs=dict(file_dir='path/to/train_data/'),
        ),
        splits_lib.SplitGenerator(
            name=tfds.Split.TEST,
            gen_kwargs=dict(file_dir='path/to/test_data/'),
        ),
    ]

  def _build_pcollection(self, pipeline, file_dir):
    """Generate examples as dicts."""
    beam = tfds.core.lazy_imports.apache_beam

    def _process_example(filename):
      # Use filename as key
      return filename, {
          'image': os.path.join(file_dir, filename),
          'label': filename.split('.')[1],  # Extract label: "0010102.dog.jpeg"
      }

    return (
        pipeline
        | beam.Create(tf.io.gfile.listdir(file_dir))
        | beam.Map(_process_example)
    )

 

Ausführen Ihrer Pipeline

Schauen Sie sich den obigen Abschnitt an, um die Pipeline auszuführen.

 python -m tensorflow_datasets.scripts.download_and_prepare \
  --register_checksums \
  --datasets=my_new_dataset