Gerando grandes conjuntos de dados com Apache Beam

Alguns conjuntos de dados são muito grandes para serem processados ​​em uma única máquina. tfds oferece suporte à geração de dados em várias máquinas usando o Apache Beam .

Este documento tem duas seções:

  • Para o usuário que deseja gerar um conjunto de dados existente do Beam
  • Para desenvolvedores que desejam criar um novo conjunto de dados do Beam

Gerando um conjunto de dados do Beam

Abaixo estão diferentes exemplos de geração de um conjunto de dados do Beam, tanto na nuvem quanto localmente.

No Google Cloud Dataflow

Para executar o pipeline usando o Google Cloud Dataflow e aproveitar a computação distribuída, primeiro siga as instruções de início rápido .

Depois que seu ambiente estiver configurado, você poderá executar a CLI tfds build usando um diretório de dados no GCS e especificando as opções necessárias para o sinalizador --beam_pipeline_options .

Para facilitar o lançamento do script, é útil definir as seguintes variáveis ​​usando os valores reais para sua configuração GCP/GCS e o conjunto de dados que você deseja gerar:

DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
GCP_PROJECT=my-project-id
GCS_BUCKET=gs://my-gcs-bucket

Você precisará criar um arquivo para instruir o Dataflow a instalar tfds nos workers:

echo "tensorflow_datasets[$DATASET_NAME]" > /tmp/beam_requirements.txt

Se você estiver usando tfds-nightly , certifique-se de ecoar de tfds-nightly caso o conjunto de dados tenha sido atualizado desde a última versão.

echo "tfds-nightly[$DATASET_NAME]" > /tmp/beam_requirements.txt

Por fim, você pode iniciar o trabalho usando o comando abaixo:

tfds build $DATASET_NAME/$DATASET_CONFIG \
  --data_dir=$GCS_BUCKET/tensorflow_datasets \
  --beam_pipeline_options=\
"runner=DataflowRunner,project=$GCP_PROJECT,job_name=$DATASET_NAME-gen,"\
"staging_location=$GCS_BUCKET/binaries,temp_location=$GCS_BUCKET/temp,"\
"requirements_file=/tmp/beam_requirements.txt"

Localmente

Para executar seu script localmente usando o executor Apache Beam padrão (deve caber todos os dados na memória), o comando é o mesmo para outros conjuntos de dados:

tfds build my_dataset

Para executar o pipeline usando o Apache Flink, você pode ler a documentação oficial . Verifique se o seu Beam é compatível com a compatibilidade de versão do Flink

Para facilitar o lançamento do script, é útil definir as seguintes variáveis ​​usando os valores reais para sua configuração do Flink e o conjunto de dados que você deseja gerar:

DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
FLINK_CONFIG_DIR=<flink-config-directory>
FLINK_VERSION=<flink-version>

Para executar em um cluster Flink incorporado, você pode iniciar o trabalho usando o comando abaixo:

tfds build $DATASET_NAME/$DATASET_CONFIG \
  --beam_pipeline_options=\
"runner=FlinkRunner,flink_version=$FLINK_VERSION,flink_conf_dir=$FLINK_CONFIG_DIR"

Com um script personalizado

Para gerar o conjunto de dados no Beam, a API é a mesma de outros conjuntos de dados. Você pode personalizar o beam.Pipeline usando os argumentos beam_options (e beam_runner ) de DownloadConfig .

# If you are running on Dataflow, Spark,..., you may have to set-up runtime
# flags. Otherwise, you can leave flags empty [].
flags = ['--runner=DataflowRunner', '--project=<project-name>', ...]

# `beam_options` (and `beam_runner`) will be forwarded to `beam.Pipeline`
dl_config = tfds.download.DownloadConfig(
    beam_options=beam.options.pipeline_options.PipelineOptions(flags=flags)
)
data_dir = 'gs://my-gcs-bucket/tensorflow_datasets'
builder = tfds.builder('wikipedia/20190301.en', data_dir=data_dir)
builder.download_and_prepare(download_config=dl_config)

Implementando um conjunto de dados do Beam

Pré-requisitos

Para escrever conjuntos de dados do Apache Beam, você deve estar familiarizado com os seguintes conceitos:

Instruções

Se você estiver familiarizado com o guia de criação de conjunto de dados , adicionar um conjunto de dados do Beam requer apenas modificar a função _generate_examples . A função deve retornar um objeto de feixe, em vez de um gerador:

Conjunto de dados não feixe:

def _generate_examples(self, path):
  for f in path.iterdir():
    yield _process_example(f)

Conjunto de dados do feixe:

def _generate_examples(self, path):
  return (
      beam.Create(path.iterdir())
      | beam.Map(_process_example)
  )

Todo o resto pode ser 100% idêntico, incluindo testes.

Algumas considerações adicionais:

  • Use tfds.core.lazy_imports para importar o Apache Beam. Ao usar uma dependência preguiçosa, os usuários ainda podem ler o conjunto de dados depois de gerado sem precisar instalar o Beam.
  • Tenha cuidado com fechamentos Python. Ao executar o pipeline, as funções beam.Map e beam.DoFn são serializadas usando pickle e enviadas para todos os trabalhadores. Não use objetos mutáveis ​​dentro de um beam.PTransform se o estado tiver que ser compartilhado entre os trabalhadores.
  • Devido à forma como tfds.core.DatasetBuilder é serializado com pickle, a mutação tfds.core.DatasetBuilder durante a criação de dados será ignorada nos trabalhadores (por exemplo, não é possível definir self.info.metadata['offset'] = 123 em _split_generators e acesse-o a partir dos trabalhadores como beam.Map(lambda x: x + self.info.metadata['offset'])
  • Se você precisar compartilhar algumas etapas do pipeline entre as divisões, poderá adicionar um pipeline: beam.Pipeline kwarg para _split_generator e controlar o pipeline de geração completo. Consulte a documentação _generate_examples de tfds.core.GeneratorBasedBuilder .

Exemplo

Aqui está um exemplo de um conjunto de dados do Beam.

class DummyBeamDataset(tfds.core.GeneratorBasedBuilder):

  VERSION = tfds.core.Version('1.0.0')

  def _info(self):
    return self.dataset_info_from_configs(
        features=tfds.features.FeaturesDict({
            'image': tfds.features.Image(shape=(16, 16, 1)),
            'label': tfds.features.ClassLabel(names=['dog', 'cat']),
        }),
    )

  def _split_generators(self, dl_manager):
    ...
    return {
        'train': self._generate_examples(file_dir='path/to/train_data/'),
        'test': self._generate_examples(file_dir='path/to/test_data/'),
    }

  def _generate_examples(self, file_dir: str):
    """Generate examples as dicts."""
    beam = tfds.core.lazy_imports.apache_beam

    def _process_example(filename):
      # Use filename as key
      return filename, {
          'image': os.path.join(file_dir, filename),
          'label': filename.split('.')[1],  # Extract label: "0010102.dog.jpeg"
      }

    return (
        beam.Create(tf.io.gfile.listdir(file_dir))
        | beam.Map(_process_example)
    )

Executando seu pipeline

Para executar o pipeline, consulte a seção acima.

tfds build my_dataset --register_checksums

Pipeline usando TFDS como entrada

Se você deseja criar um pipeline de feixe que usa um conjunto de dados TFDS como fonte, pode usar tfds.beam.ReadFromTFDS :

builder = tfds.builder('my_dataset')

_ = (
    pipeline
    | tfds.beam.ReadFromTFDS(builder, split='train')
    | beam.Map(tfds.as_numpy)
    | ...
)

Ele processará cada fragmento do conjunto de dados em paralelo.