Générer de grands ensembles de données avec Apache Beam

Certains ensembles de données sont trop volumineux pour être traités sur une seule machine. tfds prend en charge la génération de données sur de nombreuses machines à l'aide d'Apache Beam .

Ce document comporte deux sections :

  • Pour les utilisateurs souhaitant générer un jeu de données Beam existant
  • Pour les développeurs qui souhaitent créer un nouvel ensemble de données Beam

Générer un jeu de données Beam

Vous trouverez ci-dessous différents exemples de génération d'un ensemble de données Beam, à la fois sur le cloud ou localement.

Sur Google Cloud Dataflow

Pour exécuter le pipeline à l'aide de Google Cloud Dataflow et profiter du calcul distribué, suivez d'abord les instructions de démarrage rapide .

Une fois votre environnement configuré, vous pouvez exécuter la CLI tfds build en utilisant un répertoire de données sur GCS et en spécifiant les options requises pour l'indicateur --beam_pipeline_options .

Pour faciliter le lancement du script, il est utile de définir les variables suivantes à l'aide des valeurs réelles de votre configuration GCP/GCS et de l'ensemble de données que vous souhaitez générer :

DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
GCP_PROJECT=my-project-id
GCS_BUCKET=gs://my-gcs-bucket

Vous devrez ensuite créer un fichier pour indiquer à Dataflow d'installer tfds sur les nœuds de calcul :

echo "tensorflow_datasets[$DATASET_NAME]" > /tmp/beam_requirements.txt

Si vous utilisez tfds-nightly , assurez-vous de faire écho à tfds-nightly au cas où l'ensemble de données aurait été mis à jour depuis la dernière version.

echo "tfds-nightly[$DATASET_NAME]" > /tmp/beam_requirements.txt

Si vous utilisez des dépendances supplémentaires non incluses dans la bibliothèque TFDS, suivez les instructions de gestion des dépendances du pipeline Python .

Enfin, vous pouvez lancer le travail à l'aide de la commande ci-dessous :

tfds build $DATASET_NAME/$DATASET_CONFIG \
  --data_dir=$GCS_BUCKET/tensorflow_datasets \
  --beam_pipeline_options=\
"runner=DataflowRunner,project=$GCP_PROJECT,job_name=$DATASET_NAME-gen,"\
"staging_location=$GCS_BUCKET/binaries,temp_location=$GCS_BUCKET/temp,"\
"requirements_file=/tmp/beam_requirements.txt"

Localement

Pour exécuter votre script localement à l'aide du programme d'exécution Apache Beam par défaut (il doit contenir toutes les données en mémoire), la commande est la même que pour les autres ensembles de données :

tfds build my_dataset

Pour exécuter le pipeline à l'aide d' Apache Flink, vous pouvez lire la documentation officielle . Assurez-vous que votre Beam est conforme à la compatibilité des versions Flink

Pour faciliter le lancement du script, il est utile de définir les variables suivantes en utilisant les valeurs réelles de votre configuration Flink et l'ensemble de données que vous souhaitez générer :

DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
FLINK_CONFIG_DIR=<flink-config-directory>
FLINK_VERSION=<flink-version>

Pour exécuter sur un cluster Flink intégré, vous pouvez lancer la tâche à l'aide de la commande ci-dessous :

tfds build $DATASET_NAME/$DATASET_CONFIG \
  --beam_pipeline_options=\
"runner=FlinkRunner,flink_version=$FLINK_VERSION,flink_conf_dir=$FLINK_CONFIG_DIR"

Avec un script personnalisé

Pour générer le jeu de données sur Beam, l'API est la même que pour les autres jeux de données. Vous pouvez personnaliser le beam.Pipeline en utilisant les arguments beam_options (et beam_runner ) de DownloadConfig .

# If you are running on Dataflow, Spark,..., you may have to set-up runtime
# flags. Otherwise, you can leave flags empty [].
flags = ['--runner=DataflowRunner', '--project=<project-name>', ...]

# `beam_options` (and `beam_runner`) will be forwarded to `beam.Pipeline`
dl_config = tfds.download.DownloadConfig(
    beam_options=beam.options.pipeline_options.PipelineOptions(flags=flags)
)
data_dir = 'gs://my-gcs-bucket/tensorflow_datasets'
builder = tfds.builder('wikipedia/20190301.en', data_dir=data_dir)
builder.download_and_prepare(download_config=dl_config)

Implémentation d'un jeu de données Beam

Conditions préalables

Afin d'écrire des ensembles de données Apache Beam, vous devez être familier avec les concepts suivants :

Instructions

Si vous connaissez le guide de création de jeux de données , l'ajout d'un jeu de données Beam nécessite uniquement de modifier la fonction _generate_examples . La fonction doit renvoyer un objet poutre, plutôt qu'un générateur :

Ensemble de données sans faisceau :

def _generate_examples(self, path):
  for f in path.iterdir():
    yield _process_example(f)

Ensemble de données de poutre :

def _generate_examples(self, path):
  return (
      beam.Create(path.iterdir())
      | beam.Map(_process_example)
  )

Tout le reste peut être 100% identique, tests compris.

Quelques considérations supplémentaires :

  • Utilisez tfds.core.lazy_imports pour importer Apache Beam. En utilisant une dépendance paresseuse, les utilisateurs peuvent toujours lire l'ensemble de données après sa génération sans avoir à installer Beam.
  • Soyez prudent avec les fermetures Python. Lors de l'exécution du pipeline, les fonctions beam.Map et beam.DoFn sont sérialisées à l'aide pickle et envoyées à tous les travailleurs. N'utilisez pas d'objets mutables à l'intérieur d'un beam.PTransform si l'état doit être partagé entre les Workers.
  • En raison de la façon dont tfds.core.DatasetBuilder est sérialisé avec pickle, la mutation tfds.core.DatasetBuilder lors de la création des données sera ignorée sur les travailleurs (par exemple, il n'est pas possible de définir self.info.metadata['offset'] = 123 dans _split_generators et accédez-y à partir des travailleurs comme beam.Map(lambda x: x + self.info.metadata['offset'])
  • Si vous devez partager certaines étapes du pipeline entre les divisions, vous pouvez ajouter un pipeline: beam.Pipeline kwarg à _split_generator et contrôler le pipeline de génération complet. Voir la documentation _generate_examples de tfds.core.GeneratorBasedBuilder .

Exemple

Voici un exemple d'un ensemble de données Beam.

class DummyBeamDataset(tfds.core.GeneratorBasedBuilder):

  VERSION = tfds.core.Version('1.0.0')

  def _info(self):
    return self.dataset_info_from_configs(
        features=tfds.features.FeaturesDict({
            'image': tfds.features.Image(shape=(16, 16, 1)),
            'label': tfds.features.ClassLabel(names=['dog', 'cat']),
        }),
    )

  def _split_generators(self, dl_manager):
    ...
    return {
        'train': self._generate_examples(file_dir='path/to/train_data/'),
        'test': self._generate_examples(file_dir='path/to/test_data/'),
    }

  def _generate_examples(self, file_dir: str):
    """Generate examples as dicts."""
    beam = tfds.core.lazy_imports.apache_beam

    def _process_example(filename):
      # Use filename as key
      return filename, {
          'image': os.path.join(file_dir, filename),
          'label': filename.split('.')[1],  # Extract label: "0010102.dog.jpeg"
      }

    return (
        beam.Create(tf.io.gfile.listdir(file_dir))
        | beam.Map(_process_example)
    )

Exécution de votre pipeline

Pour exécuter le pipeline, consultez la section ci-dessus.

tfds build my_dataset --register_checksums

Pipeline utilisant TFDS comme entrée

Si vous souhaitez créer un pipeline de poutres qui prend un jeu de données TFDS comme source, vous pouvez utiliser tfds.beam.ReadFromTFDS :

builder = tfds.builder('my_dataset')

_ = (
    pipeline
    | tfds.beam.ReadFromTFDS(builder, split='train')
    | beam.Map(tfds.as_numpy)
    | ...
)

Il traitera chaque fragment de l'ensemble de données en parallèle.