Certains ensembles de données sont trop volumineux pour être traités sur une seule machine. tfds
prend en charge la génération de données sur de nombreuses machines à l'aide d'Apache Beam .
Ce document comporte deux sections :
- Pour les utilisateurs qui souhaitent générer un jeu de données Beam existant
- Pour les développeurs qui souhaitent créer un nouveau jeu de données Beam
Génération d'un jeu de données Beam
Vous trouverez ci-dessous différents exemples de génération d'un jeu de données Beam, à la fois sur le cloud ou localement.
Sur Google Cloud Dataflow
Pour exécuter le pipeline à l'aide de Google Cloud Dataflow et tirer parti du calcul distribué, suivez d'abord les instructions de démarrage rapide .
Une fois votre environnement configuré, vous pouvez exécuter l' interface de ligne tfds build
à l'aide d'un répertoire de données sur GCS et en spécifiant les options requises pour l'indicateur --beam_pipeline_options
.
Pour faciliter le lancement du script, il est utile de définir les variables suivantes à l'aide des valeurs réelles de votre configuration GCP/GCS et de l'ensemble de données que vous souhaitez générer :
DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
GCP_PROJECT=my-project-id
GCS_BUCKET=gs://my-gcs-bucket
Vous devrez ensuite créer un fichier pour indiquer à Dataflow d'installer tfds
sur les nœuds de calcul :
echo "tensorflow_datasets[$DATASET_NAME]" > /tmp/beam_requirements.txt
Si vous utilisez tfds-nightly
, assurez-vous de faire écho à partir de tfds-nightly
au cas où l'ensemble de données aurait été mis à jour depuis la dernière version.
echo "tfds-nightly[$DATASET_NAME]" > /tmp/beam_requirements.txt
Enfin, vous pouvez lancer la tâche à l'aide de la commande ci-dessous :
tfds build $DATASET_NAME/$DATASET_CONFIG \
--data_dir=$GCS_BUCKET/tensorflow_datasets \
--beam_pipeline_options=\
"runner=DataflowRunner,project=$GCP_PROJECT,job_name=$DATASET_NAME-gen,"\
"staging_location=$GCS_BUCKET/binaries,temp_location=$GCS_BUCKET/temp,"\
"requirements_file=/tmp/beam_requirements.txt"
Localement
Pour exécuter votre script localement à l'aide de l'exécuteur Apache Beam par défaut (il doit contenir toutes les données en mémoire), la commande est la même que pour les autres ensembles de données :
tfds build my_dataset
Avec ApacheFlink
Pour exécuter le pipeline à l'aide d' Apache Flink, vous pouvez lire la documentation officielle . Assurez-vous que votre Beam est compatible avec la compatibilité des versions Flink
Pour faciliter le lancement du script, il est utile de définir les variables suivantes à l'aide des valeurs réelles de votre configuration Flink et du jeu de données que vous souhaitez générer :
DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
FLINK_CONFIG_DIR=<flink-config-directory>
FLINK_VERSION=<flink-version>
Pour exécuter sur un cluster Flink intégré, vous pouvez lancer la tâche à l'aide de la commande ci-dessous :
tfds build $DATASET_NAME/$DATASET_CONFIG \
--beam_pipeline_options=\
"runner=FlinkRunner,flink_version=$FLINK_VERSION,flink_conf_dir=$FLINK_CONFIG_DIR"
Avec un script personnalisé
Pour générer le jeu de données sur Beam, l'API est la même que pour les autres jeux de données. Vous pouvez personnaliser le beam.Pipeline
en utilisant les arguments beam_options
(et beam_runner
) de DownloadConfig
.
# If you are running on Dataflow, Spark,..., you may have to set-up runtime
# flags. Otherwise, you can leave flags empty [].
flags = ['--runner=DataflowRunner', '--project=<project-name>', ...]
# `beam_options` (and `beam_runner`) will be forwarded to `beam.Pipeline`
dl_config = tfds.download.DownloadConfig(
beam_options=beam.options.pipeline_options.PipelineOptions(flags=flags)
)
data_dir = 'gs://my-gcs-bucket/tensorflow_datasets'
builder = tfds.builder('wikipedia/20190301.en', data_dir=data_dir)
builder.download_and_prepare(download_config=dl_config)
Implémentation d'un jeu de données Beam
Conditions préalables
Pour écrire des ensembles de données Apache Beam, vous devez être familiarisé avec les concepts suivants :
- Familiarisez-vous avec le guide de création de jeux de données
tfds
, car la plupart du contenu s'applique toujours aux jeux de données Beam. - Obtenez une introduction à Apache Beam avec le guide de programmation Beam .
- Si vous souhaitez générer votre ensemble de données à l'aide de Cloud Dataflow, lisez la documentation Google Cloud et le guide des dépendances Apache Beam .
Des instructions
Si vous connaissez le guide de création de jeu de données , l'ajout d'un jeu de données Beam nécessite uniquement de modifier la fonction _generate_examples
. La fonction doit renvoyer un objet faisceau, plutôt qu'un générateur :
Jeu de données sans faisceau :
def _generate_examples(self, path):
for f in path.iterdir():
yield _process_example(f)
Jeu de données de faisceau :
def _generate_examples(self, path):
return (
beam.Create(path.iterdir())
| beam.Map(_process_example)
)
Tout le reste peut être identique à 100%, y compris les tests.
Quelques considérations supplémentaires :
- Utilisez
tfds.core.lazy_imports
pour importer Apache Beam. En utilisant une dépendance paresseuse, les utilisateurs peuvent toujours lire le jeu de données après sa génération sans avoir à installer Beam. - Soyez prudent avec les fermetures Python. Lors de l'exécution du pipeline, les fonctions
beam.Map
etbeam.DoFn
sont sérialisées à l'aidepickle
et envoyées à tous les travailleurs. N'utilisez pas d'objets modifiables à l'intérieur d'unbeam.PTransform
si l'état doit être partagé entre les travailleurs. - En raison de la façon dont
tfds.core.DatasetBuilder
est sérialisé avec pickle, la mutationtfds.core.DatasetBuilder
lors de la création de données sera ignorée sur les travailleurs (par exemple, il n'est pas possible de définirself.info.metadata['offset'] = 123
dans_split_generators
et y accéder à partir des travailleurs commebeam.Map(lambda x: x + self.info.metadata['offset'])
- Si vous avez besoin de partager certaines étapes du pipeline entre les divisions, vous pouvez ajouter un
pipeline: beam.Pipeline
kwarg to_split_generator
et contrôler le pipeline de génération complet. Voir la documentation_generate_examples
detfds.core.GeneratorBasedBuilder
.
Exemple
Voici un exemple de jeu de données Beam.
class DummyBeamDataset(tfds.core.GeneratorBasedBuilder):
VERSION = tfds.core.Version('1.0.0')
def _info(self):
return self.dataset_info_from_configs(
features=tfds.features.FeaturesDict({
'image': tfds.features.Image(shape=(16, 16, 1)),
'label': tfds.features.ClassLabel(names=['dog', 'cat']),
}),
)
def _split_generators(self, dl_manager):
...
return {
'train': self._generate_examples(file_dir='path/to/train_data/'),
'test': self._generate_examples(file_dir='path/to/test_data/'),
}
def _generate_examples(self, file_dir: str):
"""Generate examples as dicts."""
beam = tfds.core.lazy_imports.apache_beam
def _process_example(filename):
# Use filename as key
return filename, {
'image': os.path.join(file_dir, filename),
'label': filename.split('.')[1], # Extract label: "0010102.dog.jpeg"
}
return (
beam.Create(tf.io.gfile.listdir(file_dir))
| beam.Map(_process_example)
)
Exécution de votre pipeline
Pour exécuter le pipeline, consultez la section ci-dessus.
tfds build my_dataset --register_checksums
Pipeline utilisant TFDS comme entrée
Si vous souhaitez créer un pipeline de faisceau qui prend un jeu de données TFDS comme source, vous pouvez utiliser le tfds.beam.ReadFromTFDS
:
builder = tfds.builder('my_dataset')
_ = (
pipeline
| tfds.beam.ReadFromTFDS(builder, split='train')
| beam.Map(tfds.as_numpy)
| ...
)
Il traitera chaque fragment de l'ensemble de données en parallèle.