Gerando grandes conjuntos de dados com Apache Beam

Alguns conjuntos de dados são muito grandes para serem processados ​​em uma única máquina. tfds suportes gerando dados através de muitas máquinas usando Apache feixe .

Este documento tem duas seções:

  • Para o usuário que deseja gerar um conjunto de dados Beam existente
  • Para desenvolvedores que desejam criar um novo conjunto de dados Beam

Gerando um conjunto de dados Beam

Abaixo estão diferentes exemplos de geração de um conjunto de dados Beam, tanto na nuvem quanto localmente.

No Google Cloud Dataflow

Para executar o pipeline usando o Google Cloud Dataflow e tirar proveito da computação distribuída, primeiro siga as instruções de início rápido .

Uma vez que seu ambiente está configurado, você pode executar o tfds build CLI usando um diretório de dados no GCS e especificando as opções necessárias para o --beam_pipeline_options bandeira.

Para facilitar a inicialização do script, é útil definir as seguintes variáveis ​​usando os valores reais para a configuração do GCP / GCS e o conjunto de dados que você deseja gerar:

DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
GCP_PROJECT=my-project-id
GCS_BUCKET=gs://my-gcs-bucket

Em seguida, terá de criar um arquivo para contar Dataflow para instalar tfds sobre os trabalhadores:

echo "tensorflow_datasets[$DATASET_NAME]" > /tmp/beam_requirements.txt

Se você estiver usando tfds-nightly , certifique-se de fazer eco de tfds-nightly no caso do conjunto de dados foi atualizada desde a última versão.

echo "tfds-nightly[$DATASET_NAME]" > /tmp/beam_requirements.txt

Finalmente, você pode iniciar o trabalho usando o comando abaixo:

tfds build $DATASET_NAME/$DATASET_CONFIG \
  --data_dir=$GCS_BUCKET/tensorflow_datasets \
  --beam_pipeline_options=\
"runner=DataflowRunner,project=$GCP_PROJECT,job_name=$DATASET_NAME-gen,"\
"staging_location=$GCS_BUCKET/binaries,temp_location=$GCS_BUCKET/temp,"\
"requirements_file=/tmp/beam_requirements.txt"

Localmente

Para executar seu script localmente usando o executor Apache Beam padrão, o comando é o mesmo que para outros conjuntos de dados:

tfds build my_dataset

Com um script personalizado

Para gerar o conjunto de dados no Beam, a API é a mesma que para outros conjuntos de dados. Você pode personalizar o beam.Pipeline usando os beam_options (e beam_runner ) argumentos de DownloadConfig .

# If you are running on Dataflow, Spark,..., you may have to set-up runtime
# flags. Otherwise, you can leave flags empty [].
flags = ['--runner=DataflowRunner', '--project=<project-name>', ...]

# `beam_options` (and `beam_runner`) will be forwarded to `beam.Pipeline`
dl_config = tfds.download.DownloadConfig(
    beam_options=beam.options.pipeline_options.PipelineOptions(flags=flags)
)
data_dir = 'gs://my-gcs-bucket/tensorflow_datasets'
builder = tfds.builder('wikipedia/20190301.en', data_dir=data_dir)
builder.download_and_prepare(download_config=dl_config)

Implementando um conjunto de dados Beam

Pré-requisitos

Para escrever conjuntos de dados do Apache Beam, você deve estar familiarizado com os seguintes conceitos:

Instruções

Se você estiver familiarizado com o guia de criação do conjunto de dados , adicionando um conjunto de dados Feixe requer apenas para modificar o _generate_examples função. A função deve retornar um objeto de feixe, em vez de um gerador:

Conjunto de dados sem feixe:

def _generate_examples(self, path):
  for f in path.iterdir():
    yield _process_example(f)

Conjunto de dados do feixe:

def _generate_examples(self, path):
  return (
      beam.Create(path.iterdir())
      | beam.Map(_process_example)
  )

Todo o resto pode ser 100% idêntico, incluindo testes.

Algumas considerações adicionais:

  • Use tfds.core.lazy_imports importar Apache Beam. Usando uma dependência preguiçosa, os usuários ainda podem ler o conjunto de dados após ele ter sido gerado, sem precisar instalar o Beam.
  • Tenha cuidado com os encerramentos Python. Ao executar o pipeline, os beam.Map e beam.DoFn funções são serializados utilizando pickle e enviado a todos os trabalhadores. Fazer objetos não mutáveis dentro de um beam.PTransform se o Estado tem de ser compartilhada entre os trabalhadores.
  • Devido à forma como tfds.core.DatasetBuilder é serializado com picles, mutação tfds.core.DatasetBuilder durante a criação de dados será ignorado sobre os trabalhadores (por exemplo, não é possível definir self.info.metadata['offset'] = 123 em _split_generators e acessá-lo a partir dos trabalhadores como beam.Map(lambda x: x + self.info.metadata['offset']) )
  • Se você precisa compartilhar alguns passos de dutos entre as divisões, você pode adicionar adicionar um extra de pipeline: beam.Pipeline kwarg para _split_generator e controlar o pipeline cheio geração. Veja _generate_examples documentação de tfds.core.GeneratorBasedBuilder .

Exemplo

Aqui está um exemplo de um conjunto de dados Beam.

class DummyBeamDataset(tfds.core.GeneratorBasedBuilder):

  VERSION = tfds.core.Version('1.0.0')

  def _info(self):
    return tfds.core.DatasetInfo(
        builder=self,
        features=tfds.features.FeaturesDict({
            'image': tfds.features.Image(shape=(16, 16, 1)),
            'label': tfds.features.ClassLabel(names=['dog', 'cat']),
        }),
    )

  def _split_generators(self, dl_manager):
    ...
    return {
        'train': self._generate_examples(file_dir='path/to/train_data/'),
        'test': self._generate_examples(file_dir='path/to/test_data/'),
    }

  def _generate_examples(self, file_dir: str):
    """Generate examples as dicts."""
    beam = tfds.core.lazy_imports.apache_beam

    def _process_example(filename):
      # Use filename as key
      return filename, {
          'image': os.path.join(file_dir, filename),
          'label': filename.split('.')[1],  # Extract label: "0010102.dog.jpeg"
      }

    return (
        beam.Create(tf.io.gfile.listdir(file_dir))
        | beam.Map(_process_example)
    )

Executando seu pipeline

Para executar o pipeline, dê uma olhada na seção acima.

tfds build my_dataset --register_checksums

Pipeline usando TFDS como entrada

Se você deseja criar um pipeline de feixe que tem um conjunto de dados TFDS como a fonte, você pode usar os tfds.beam.ReadFromTFDS :

builder = tfds.builder('my_dataset')

_ = (
    pipeline
    | tfds.beam.ReadFromTFDS(builder, split='train')
    | beam.Map(tfds.as_numpy)
    | ...
)

Ele processará cada fragmento do conjunto de dados em paralelo.