Cette page a été traduite par l'API Cloud Translation.
Switch to English

Générer de grands ensembles de données avec Apache Beam

Certains ensembles de données sont trop volumineux pour être traités sur une seule machine. tfds prend en charge la génération de données sur de nombreuses machines à l'aide d' Apache Beam .

Ce document comporte deux sections:

  • Pour l'utilisateur qui souhaite générer un jeu de données Beam existant
  • Pour les développeurs qui souhaitent créer un nouvel ensemble de données Beam

Table des matières:

Génération d'un jeu de données Beam

Vous trouverez ci-dessous différents exemples de génération d'un ensemble de données Beam, à la fois sur le cloud ou localement.

Sur Google Cloud Dataflow

Pour exécuter le pipeline à l'aide de Google Cloud Dataflow et tirer parti du calcul distribué, suivez d'abord les instructions de démarrage rapide .

Une fois votre environnement configuré, vous pouvez exécuter le script download_and_prepare utilisant un répertoire de données sur GCS et en spécifiant les options requises pour l'indicateur --beam_pipeline_options .

Pour faciliter le lancement du script, il est utile de définir les variables suivantes à l'aide des valeurs réelles de votre configuration GCP / GCS et de l'ensemble de données que vous souhaitez générer:

 DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
GCP_PROJECT=my-project-id
GCS_BUCKET=gs://my-gcs-bucket
 

Vous devrez ensuite créer un fichier pour indiquer à Dataflow d'installer tfds sur les workers:

 echo "tensorflow_datasets[$DATASET_NAME]" > /tmp/beam_requirements.txt
 

Si vous utilisez tfds-nightly , assurez-vous de faire écho à tfds-nightly au cas où l'ensemble de données aurait été mis à jour depuis la dernière version.

 echo "tfds-nightly[$DATASET_NAME]" > /tmp/beam_requirements.txt
 

Enfin, vous pouvez lancer le travail en utilisant la commande ci-dessous:

 python -m tensorflow_datasets.scripts.download_and_prepare \
  --datasets=$DATASET_NAME/$DATASET_CONFIG \
  --data_dir=$GCS_BUCKET/tensorflow_datasets \
  --beam_pipeline_options=\
"runner=DataflowRunner,project=$GCP_PROJECT,job_name=$DATASET_NAME-gen,"\
"staging_location=$GCS_BUCKET/binaries,temp_location=$GCS_BUCKET/temp,"\
"requirements_file=/tmp/beam_requirements.txt"
 

Localement

Pour exécuter votre script localement à l'aide du runner Apache Beam par défaut, la commande est la même que pour les autres ensembles de données:

 python -m tensorflow_datasets.scripts.download_and_prepare \
  --datasets=my_new_dataset
 

Avec un script personnalisé

Pour générer l'ensemble de données sur Beam, l'API est la même que pour les autres ensembles de données, mais vous devez transmettre les options ou le runner Beam à DownloadConfig .

 # If you are running on Dataflow, Spark,..., you may have to set-up runtime
# flags. Otherwise, you can leave flags empty [].
flags = ['--runner=DataflowRunner', '--project=<project-name>', ...]

# To use Beam, you have to set at least one of `beam_options` or `beam_runner`
dl_config = tfds.download.DownloadConfig(
    beam_options=beam.options.pipeline_options.PipelineOptions(flags=flags)
)

data_dir = 'gs://my-gcs-bucket/tensorflow_datasets'
builder = tfds.builder('wikipedia/20190301.en', data_dir=data_dir)
builder.download_and_prepare(
    download_dir=FLAGS.download_dir,
    download_config=dl_config,
)
 

Implémentation d'un jeu de données Beam

Conditions préalables

Pour écrire des ensembles de données Apache Beam, vous devez être familiarisé avec les concepts suivants:

Instructions

Si vous connaissez le guide de création de jeux de données , l'ajout d'un jeu de données Beam ne nécessite que quelques modifications:

  • Votre DatasetBuilder héritera de tfds.core.BeamBasedBuilder au lieu de tfds.core.GeneratorBasedBuilder .
  • Les ensembles de données Beam doivent implémenter la méthode abstraite _build_pcollection(self, **kwargs) au lieu de la méthode _generate_examples(self, **kwargs) . _build_pcollection doit renvoyer un beam.PCollection avec les exemples associés au fractionnement.
  • L'écriture d'un test unitaire pour votre jeu de données Beam est identique à celui des autres jeux de données.

Quelques considérations supplémentaires:

  • Utilisez tfds.core.lazy_imports pour importer Apache Beam. En utilisant une dépendance paresseuse, les utilisateurs peuvent toujours lire l'ensemble de données après qu'il a été généré sans avoir à installer Beam.
  • Soyez prudent avec les fermetures Python. Lors de l'exécution du pipeline, les fonctions beam.Map et beam.DoFn sont sérialisées à l'aide de pickle et envoyées à tous les travailleurs. Cela peut créer des bogues; par exemple, si vous utilisez un objet mutable dans vos fonctions qui a été déclaré en dehors de la fonction, vous pouvez rencontrer des erreurs de pickle ou un comportement inattendu. Le correctif consiste généralement à éviter la mutation des objets fermés.
  • L'utilisation de méthodes sur DatasetBuilder dans le pipeline Beam est très bien. Cependant, la façon dont la classe est sérialisée pendant le pickle, les modifications apportées aux fonctionnalités lors de la création seront au mieux ignorées.

Exemple

Voici un exemple de jeu de données Beam. Pour un exemple réel plus compliqué, jetez un œil à Wikipedia ensemble de données Wikipedia .

 class DummyBeamDataset(tfds.core.BeamBasedBuilder):

  VERSION = tfds.core.Version('1.0.0')

  def _info(self):
    return tfds.core.DatasetInfo(
        builder=self,
        features=tfds.features.FeaturesDict({
            'image': tfds.features.Image(shape=(16, 16, 1)),
            'label': tfds.features.ClassLabel(names=['dog', 'cat']),
        }),
    )

  def _split_generators(self, dl_manager):
    ...
    return [
        tfds.core.SplitGenerator(
            name=tfds.Split.TRAIN,
            gen_kwargs=dict(file_dir='path/to/train_data/'),
        ),
        splits_lib.SplitGenerator(
            name=tfds.Split.TEST,
            gen_kwargs=dict(file_dir='path/to/test_data/'),
        ),
    ]

  def _build_pcollection(self, pipeline, file_dir):
    """Generate examples as dicts."""
    beam = tfds.core.lazy_imports.apache_beam

    def _process_example(filename):
      # Use filename as key
      return filename, {
          'image': os.path.join(file_dir, filename),
          'label': filename.split('.')[1],  # Extract label: "0010102.dog.jpeg"
      }

    return (
        pipeline
        | beam.Create(tf.io.gfile.listdir(file_dir))
        | beam.Map(_process_example)
    )

 

Exécuter votre pipeline

Pour exécuter le pipeline, consultez la section ci-dessus.

 python -m tensorflow_datasets.scripts.download_and_prepare \
  --register_checksums \
  --datasets=my_new_dataset