Aiuto proteggere la Grande Barriera Corallina con tensorflow sul Kaggle Join Sfida

Generazione di grandi set di dati con Apache Beam

Alcuni set di dati sono troppo grandi per essere elaborati su una singola macchina. tfds supporti la generazione di dati attraverso molte macchine utilizzando Apache fascio .

Questo documento ha due sezioni:

  • Per gli utenti che desiderano generare un set di dati Beam esistente
  • Per gli sviluppatori che desiderano creare un nuovo set di dati Beam

Generazione di un set di dati Beam

Di seguito sono riportati diversi esempi di generazione di un set di dati Beam, sia sul cloud che localmente.

Sul flusso di dati di Google Cloud

Per eseguire la pipeline utilizzando Google Cloud flusso di dati e approfittare di calcolo distribuito, in primo luogo seguire le istruzioni Quickstart .

Una volta che il vostro ambiente è impostato, è possibile eseguire il tfds build CLI utilizzando un elenco di dati sulla GCS e specificando le opzioni necessarie per la --beam_pipeline_options bandiera.

Per facilitare l'avvio dello script, è utile definire le seguenti variabili utilizzando i valori effettivi per la tua configurazione GCP/GCS e il set di dati che desideri generare:

DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
GCP_PROJECT=my-project-id
GCS_BUCKET=gs://my-gcs-bucket

Sarà quindi necessario creare un file di dire flusso di dati da installare tfds sui lavoratori:

echo "tensorflow_datasets[$DATASET_NAME]" > /tmp/beam_requirements.txt

Se stai usando tfds-nightly , assicurarsi di echeggiare da tfds-nightly nel caso in cui l'insieme di dati è stata aggiornata dall'ultima release.

echo "tfds-nightly[$DATASET_NAME]" > /tmp/beam_requirements.txt

Infine, puoi avviare il lavoro utilizzando il comando seguente:

tfds build $DATASET_NAME/$DATASET_CONFIG \
  --data_dir=$GCS_BUCKET/tensorflow_datasets \
  --beam_pipeline_options=\
"runner=DataflowRunner,project=$GCP_PROJECT,job_name=$DATASET_NAME-gen,"\
"staging_location=$GCS_BUCKET/binaries,temp_location=$GCS_BUCKET/temp,"\
"requirements_file=/tmp/beam_requirements.txt"

Localmente

Per eseguire lo script localmente utilizzando il runner Apache Beam predefinito, il comando è lo stesso degli altri set di dati:

tfds build my_dataset

Con uno script personalizzato

Per generare il set di dati su Beam, l'API è la stessa degli altri set di dati. È possibile personalizzare la beam.Pipeline utilizzando le beam_options (e beam_runner ) Argomenti delle DownloadConfig .

# If you are running on Dataflow, Spark,..., you may have to set-up runtime
# flags. Otherwise, you can leave flags empty [].
flags = ['--runner=DataflowRunner', '--project=<project-name>', ...]

# `beam_options` (and `beam_runner`) will be forwarded to `beam.Pipeline`
dl_config = tfds.download.DownloadConfig(
    beam_options=beam.options.pipeline_options.PipelineOptions(flags=flags)
)
data_dir = 'gs://my-gcs-bucket/tensorflow_datasets'
builder = tfds.builder('wikipedia/20190301.en', data_dir=data_dir)
builder.download_and_prepare(download_config=dl_config)

Implementazione di un set di dati Beam

Prerequisiti

Per scrivere set di dati Apache Beam, dovresti avere familiarità con i seguenti concetti:

Istruzioni

Se si ha familiarità con la Guida alla creazione di set di dati , l'aggiunta di un set di dati del fascio richiede solo di modificare la _generate_examples funzioni. La funzione dovrebbe restituire un oggetto trave, piuttosto che un generatore:

Set di dati non fascio:

def _generate_examples(self, path):
  for f in path.iterdir():
    yield _process_example(f)

Set di dati del raggio:

def _generate_examples(self, path):
  return (
      beam.Create(path.iterdir())
      | beam.Map(_process_example)
  )

Tutto il resto può essere identico al 100%, inclusi i test.

Alcune considerazioni aggiuntive:

  • Utilizzare tfds.core.lazy_imports importare Apache Beam. Utilizzando una dipendenza pigra, gli utenti possono ancora leggere il set di dati dopo che è stato generato senza dover installare Beam.
  • Fai attenzione con le chiusure Python. Quando si esegue la pipeline, le beam.Map e beam.DoFn funzioni vengono serializzati mediante pickle e inviato a tutti i lavoratori. Fare oggetti non mutabili all'interno di un beam.PTransform se lo stato deve essere condiviso tra lavoratori.
  • A causa del modo tfds.core.DatasetBuilder viene serializzato con salamoia, mutando tfds.core.DatasetBuilder durante la creazione dei dati saranno ignorati sui lavoratori (ad esempio, non è possibile impostare self.info.metadata['offset'] = 123 in _split_generators e accedervi da parte dei lavoratori come beam.Map(lambda x: x + self.info.metadata['offset']) )
  • Se avete bisogno di condividere alcuni passi della pipeline tra le spaccature, è possibile aggiungere aggiungere un ulteriore pipeline: beam.Pipeline kwarg per _split_generator e controllare la pipeline full generazione. Vedere _generate_examples documentazione tfds.core.GeneratorBasedBuilder .

Esempio

Ecco un esempio di set di dati Beam.

class DummyBeamDataset(tfds.core.GeneratorBasedBuilder):

  VERSION = tfds.core.Version('1.0.0')

  def _info(self):
    return tfds.core.DatasetInfo(
        builder=self,
        features=tfds.features.FeaturesDict({
            'image': tfds.features.Image(shape=(16, 16, 1)),
            'label': tfds.features.ClassLabel(names=['dog', 'cat']),
        }),
    )

  def _split_generators(self, dl_manager):
    ...
    return {
        'train': self._generate_examples(file_dir='path/to/train_data/'),
        'test': self._generate_examples(file_dir='path/to/test_data/'),
    }

  def _generate_examples(self, file_dir: str):
    """Generate examples as dicts."""
    beam = tfds.core.lazy_imports.apache_beam

    def _process_example(filename):
      # Use filename as key
      return filename, {
          'image': os.path.join(file_dir, filename),
          'label': filename.split('.')[1],  # Extract label: "0010102.dog.jpeg"
      }

    return (
        beam.Create(tf.io.gfile.listdir(file_dir))
        | beam.Map(_process_example)
    )

Gestire la tua pipeline

Per eseguire la pipeline, dai un'occhiata alla sezione precedente.

tfds build my_dataset --register_checksums

Pipeline che utilizza TFDS come input

Se si desidera creare una pipeline fascio che prende un set di dati TFDS come sorgente, è possibile utilizzare i tfds.beam.ReadFromTFDS :

builder = tfds.builder('my_dataset')

_ = (
    pipeline
    | tfds.beam.ReadFromTFDS(builder, split='train')
    | beam.Map(tfds.as_numpy)
    | ...
)

Elaborerà ogni frammento del set di dati in parallelo.