Se usó la API de Cloud Translation para traducir esta página.
Switch to English

Generación de grandes conjuntos de datos con Apache Beam

Algunos conjuntos de datos son demasiado grandes para procesarse en una sola máquina. tfds admite la generación de datos en muchas máquinas mediante Apache Beam .

Este documento tiene dos secciones:

  • Para usuarios que desean generar un conjunto de datos de Beam existente
  • Para desarrolladores que desean crear un nuevo conjunto de datos de Beam

Generando un conjunto de datos de Beam

A continuación, se muestran diferentes ejemplos de generación de un conjunto de datos de Beam, tanto en la nube como localmente.

En Google Cloud Dataflow

Para ejecutar la canalización con Google Cloud Dataflow y aprovechar la computación distribuida, primero siga las instrucciones de inicio rápido .

Una vez que su entorno está configurado, puede ejecutar la CLI de tfds build usando un directorio de datos en GCS y especificando las opciones requeridas para la --beam_pipeline_options .

Para facilitar el lanzamiento de la secuencia de comandos, es útil definir las siguientes variables utilizando los valores reales para su configuración de GCP / GCS y el conjunto de datos que desea generar:

DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
GCP_PROJECT=my-project-id
GCS_BUCKET=gs://my-gcs-bucket

Luego, deberá crear un archivo para decirle a Dataflow que instale tfds en los trabajadores:

echo "tensorflow_datasets[$DATASET_NAME]" > /tmp/beam_requirements.txt

Si está utilizando tfds-nightly , asegúrese de hacer eco de tfds-nightly en caso de que el conjunto de datos se haya actualizado desde la última versión.

echo "tfds-nightly[$DATASET_NAME]" > /tmp/beam_requirements.txt

Finalmente, puede iniciar el trabajo usando el siguiente comando:

tfds build $DATASET_NAME/$DATASET_CONFIG \
  --data_dir=$GCS_BUCKET/tensorflow_datasets \
  --beam_pipeline_options=\
"runner=DataflowRunner,project=$GCP_PROJECT,job_name=$DATASET_NAME-gen,"\
"staging_location=$GCS_BUCKET/binaries,temp_location=$GCS_BUCKET/temp,"\
"requirements_file=/tmp/beam_requirements.txt"

En la zona

Para ejecutar su secuencia de comandos localmente utilizando el corredor Apache Beam predeterminado, el comando es el mismo que para otros conjuntos de datos:

tfds build my_dataset

Con un guión personalizado

Para generar el conjunto de datos en Beam, la API es la misma que para otros conjuntos de datos. Puede personalizar el beam.Pipeline usando los beam_options (y beam_runner ) de DownloadConfig .

# If you are running on Dataflow, Spark,..., you may have to set-up runtime
# flags. Otherwise, you can leave flags empty [].
flags = ['--runner=DataflowRunner', '--project=<project-name>', ...]

# `beam_options` (and `beam_runner`) will be forwarded to `beam.Pipeline`
dl_config = tfds.download.DownloadConfig(
    beam_options=beam.options.pipeline_options.PipelineOptions(flags=flags)
)
data_dir = 'gs://my-gcs-bucket/tensorflow_datasets'
builder = tfds.builder('wikipedia/20190301.en', data_dir=data_dir)
builder.download_and_prepare(download_config=dl_config)

Implementar un conjunto de datos de Beam

Prerrequisitos

Para escribir conjuntos de datos de Apache Beam, debe estar familiarizado con los siguientes conceptos:

Instrucciones

Si está familiarizado con la guía de creación de conjuntos de datos , agregar un conjunto de datos de Beam solo requiere modificar la función _generate_examples . La función debería devolver un objeto de haz, en lugar de un generador:

Conjunto de datos sin haz:

def _generate_examples(self, path):
  for f in path.iterdir():
    yield _process_example(f)

Conjunto de datos de haz:

def _generate_examples(self, path):
  return (
      beam.Create(path.iterdir())
      | beam.Map(_process_example)
  )

Todo lo demás puede ser 100% idéntico, incluidas las pruebas.

Algunas consideraciones adicionales:

  • Utilice tfds.core.lazy_imports para importar Apache Beam. Al usar una dependencia perezosa, los usuarios aún pueden leer el conjunto de datos después de que se haya generado sin tener que instalar Beam.
  • Tenga cuidado con los cierres de Python. Cuando se ejecuta la tubería, las funciones beam.Map y beam.DoFn se serializan usando pickle y se envían a todos los trabajadores. No beam.PTransform objetos dentro de una beam.PTransform si el estado debe ser compartido entre trabajadores.
  • Debido a la forma en que tfds.core.DatasetBuilder se serializa con pickle, la mutación de tfds.core.DatasetBuilder durante la creación de datos se ignorará en los trabajadores (por ejemplo, no es posible establecer self.info.metadata['offset'] = 123 en _split_generators y acceda a él desde los trabajadores como beam.Map(lambda x: x + self.info.metadata['offset']) )
  • Si necesita compartir algunos pasos de la canalización entre las divisiones, puede agregar una pipeline: beam.Pipeline adicional pipeline: beam.Pipeline kwarg a _split_generator y controlar la canalización de generación completa. Ver _generate_examples documentación de tfds.core.GeneratorBasedBuilder .

Ejemplo

A continuación, se muestra un ejemplo de un conjunto de datos de Beam.

class DummyBeamDataset(tfds.core.GeneratorBasedBuilder):

  VERSION = tfds.core.Version('1.0.0')

  def _info(self):
    return tfds.core.DatasetInfo(
        builder=self,
        features=tfds.features.FeaturesDict({
            'image': tfds.features.Image(shape=(16, 16, 1)),
            'label': tfds.features.ClassLabel(names=['dog', 'cat']),
        }),
    )

  def _split_generators(self, dl_manager):
    ...
    return {
        'train': self._generate_examples(file_dir='path/to/train_data/'),
        'test': self._generate_examples(file_dir='path/to/test_data/'),
    }

  def _generate_examples(self, file_dir: str):
    """Generate examples as dicts."""
    beam = tfds.core.lazy_imports.apache_beam

    def _process_example(filename):
      # Use filename as key
      return filename, {
          'image': os.path.join(file_dir, filename),
          'label': filename.split('.')[1],  # Extract label: "0010102.dog.jpeg"
      }

    return (
        beam.Create(tf.io.gfile.listdir(file_dir))
        | beam.Map(_process_example)
    )

Ejecutando su canalización

Para ejecutar la canalización, eche un vistazo a la sección anterior.

tfds build my_dataset --register_checksums