Ayuda a proteger la Gran Barrera de Coral con TensorFlow en Kaggle Únete Challenge

Generación de grandes conjuntos de datos con Apache Beam

Algunos conjuntos de datos son demasiado grandes para procesarse en una sola máquina. tfds soportes que generan datos a través de muchas máquinas utilizando Apache Beam .

Este documento tiene dos secciones:

  • Para usuarios que desean generar un conjunto de datos de Beam existente
  • Para desarrolladores que desean crear un nuevo conjunto de datos de Beam

Generando un conjunto de datos de Beam

A continuación, se muestran diferentes ejemplos de cómo generar un conjunto de datos de Beam, tanto en la nube como localmente.

En Google Cloud Dataflow

Para ejecutar la tubería utilizando Google Cloud flujo de datos y tomar ventaja de la computación distribuida, primero siga las instrucciones de inicio rápido .

Una vez que su entorno está configurado, puede ejecutar el tfds build CLI utilizando un directorio de datos de GCS y especificando las opciones requeridas para el --beam_pipeline_options bandera.

Para facilitar el lanzamiento de la secuencia de comandos, es útil definir las siguientes variables utilizando los valores reales para su configuración de GCP / GCS y el conjunto de datos que desea generar:

DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
GCP_PROJECT=my-project-id
GCS_BUCKET=gs://my-gcs-bucket

A continuación, tendrá que crear un archivo para contar flujo de datos para instalar tfds sobre los trabajadores:

echo "tensorflow_datasets[$DATASET_NAME]" > /tmp/beam_requirements.txt

Si está utilizando tfds-nightly , asegúrese de hacerse eco de tfds-nightly en caso de que el conjunto de datos se ha actualizado desde la última versión.

echo "tfds-nightly[$DATASET_NAME]" > /tmp/beam_requirements.txt

Finalmente, puede iniciar el trabajo usando el siguiente comando:

tfds build $DATASET_NAME/$DATASET_CONFIG \
  --data_dir=$GCS_BUCKET/tensorflow_datasets \
  --beam_pipeline_options=\
"runner=DataflowRunner,project=$GCP_PROJECT,job_name=$DATASET_NAME-gen,"\
"staging_location=$GCS_BUCKET/binaries,temp_location=$GCS_BUCKET/temp,"\
"requirements_file=/tmp/beam_requirements.txt"

En la zona

Para ejecutar su secuencia de comandos localmente utilizando el corredor Apache Beam predeterminado, el comando es el mismo que para otros conjuntos de datos:

tfds build my_dataset

Con un guión personalizado

Para generar el conjunto de datos en Beam, la API es la misma que para otros conjuntos de datos. Se puede personalizar la beam.Pipeline utilizando los beam_options (y beam_runner ) argumentos de DownloadConfig .

# If you are running on Dataflow, Spark,..., you may have to set-up runtime
# flags. Otherwise, you can leave flags empty [].
flags = ['--runner=DataflowRunner', '--project=<project-name>', ...]

# `beam_options` (and `beam_runner`) will be forwarded to `beam.Pipeline`
dl_config = tfds.download.DownloadConfig(
    beam_options=beam.options.pipeline_options.PipelineOptions(flags=flags)
)
data_dir = 'gs://my-gcs-bucket/tensorflow_datasets'
builder = tfds.builder('wikipedia/20190301.en', data_dir=data_dir)
builder.download_and_prepare(download_config=dl_config)

Implementar un conjunto de datos de Beam

Prerrequisitos

Para escribir conjuntos de datos de Apache Beam, debe estar familiarizado con los siguientes conceptos:

Instrucciones

Si está familiarizado con la guía de creación de base de datos , añadiendo un conjunto de datos Beam sólo requiere modificar el _generate_examples función. La función debería devolver un objeto de haz, en lugar de un generador:

Conjunto de datos sin haz:

def _generate_examples(self, path):
  for f in path.iterdir():
    yield _process_example(f)

Conjunto de datos de haz:

def _generate_examples(self, path):
  return (
      beam.Create(path.iterdir())
      | beam.Map(_process_example)
  )

Todo lo demás puede ser 100% idéntico, incluidas las pruebas.

Algunas consideraciones adicionales:

  • Utilice tfds.core.lazy_imports importar Apache Beam. Al usar una dependencia perezosa, los usuarios aún pueden leer el conjunto de datos después de que se haya generado sin tener que instalar Beam.
  • Tenga cuidado con los cierres de Python. Cuando se ejecuta la tubería, los beam.Map y beam.DoFn funciones se serializan con pickle y se envían a todos los trabajadores. Hacer objetos no mutables dentro de un beam.PTransform si el estado tiene que ser compartida entre los trabajadores.
  • Debido a la forma tfds.core.DatasetBuilder se serializa con salmuera, mutando tfds.core.DatasetBuilder durante la creación de los datos se hizo caso omiso a los trabajadores (por ejemplo, no es posible establecer self.info.metadata['offset'] = 123 en _split_generators y acceder a ella desde los trabajadores como beam.Map(lambda x: x + self.info.metadata['offset']) )
  • Si necesita compartir algunos pasos de tuberías entre las divisiones, se puede añadir añadir un extra de pipeline: beam.Pipeline kwarg a _split_generator y controlar la tubería llena generación. Ver _generate_examples documentación de tfds.core.GeneratorBasedBuilder .

Ejemplo

A continuación, se muestra un ejemplo de un conjunto de datos de Beam.

class DummyBeamDataset(tfds.core.GeneratorBasedBuilder):

  VERSION = tfds.core.Version('1.0.0')

  def _info(self):
    return tfds.core.DatasetInfo(
        builder=self,
        features=tfds.features.FeaturesDict({
            'image': tfds.features.Image(shape=(16, 16, 1)),
            'label': tfds.features.ClassLabel(names=['dog', 'cat']),
        }),
    )

  def _split_generators(self, dl_manager):
    ...
    return {
        'train': self._generate_examples(file_dir='path/to/train_data/'),
        'test': self._generate_examples(file_dir='path/to/test_data/'),
    }

  def _generate_examples(self, file_dir: str):
    """Generate examples as dicts."""
    beam = tfds.core.lazy_imports.apache_beam

    def _process_example(filename):
      # Use filename as key
      return filename, {
          'image': os.path.join(file_dir, filename),
          'label': filename.split('.')[1],  # Extract label: "0010102.dog.jpeg"
      }

    return (
        beam.Create(tf.io.gfile.listdir(file_dir))
        | beam.Map(_process_example)
    )

Ejecutando su canalización

Para ejecutar la canalización, eche un vistazo a la sección anterior.

tfds build my_dataset --register_checksums

Canalización usando TFDS como entrada

Si desea crear una fuente de haz que tiene un conjunto de datos TFDS como fuente, puede utilizar los tfds.beam.ReadFromTFDS :

builder = tfds.builder('my_dataset')

_ = (
    pipeline
    | tfds.beam.ReadFromTFDS(builder, split='train')
    | beam.Map(tfds.as_numpy)
    | ...
)

Procesará cada fragmento del conjunto de datos en paralelo.