此页面由 Cloud Translation API 翻译。
Switch to English

使用Apache Beam生成大型数据集

有些数据集太大,无法在一台计算机上进行处理。 tfds支持使用Apache Beam在许多机器上生成数据。

该文档分为两部分:

  • 对于想要生成现有Beam数据集的用户
  • 对于想要创建新的Beam数据集的开发人员

表中的内容:

生成Beam数据集

以下是在云上或本地生成Beam数据集的不同示例。

在Google Cloud Dataflow上

要使用Google Cloud Dataflow运行管道并利用分布式计算的优势,请首先遵循快速入门说明

设置好环境后,您可以使用GCS上的数据目录并为--beam_pipeline_options标志指定所需的选项来运行download_and_prepare脚本。

为了使启动脚本更容易,使用GCP / GCS设置和要生成的数据集的实际值定义以下变量会很有帮助:

 DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
GCP_PROJECT=my-project-id
GCS_BUCKET=gs://my-gcs-bucket
 

然后,您需要创建一个文件来告诉Dataflow在工作tfds上安装tfds

 echo "tensorflow_datasets[$DATASET_NAME]" > /tmp/beam_requirements.txt
 

如果您使用的是tfds-nightly ,请确保从tfds-nightlytfds-nightly ,以防自上次发行以来数据集已更新。

 echo "tfds-nightly[$DATASET_NAME]" > /tmp/beam_requirements.txt
 

最后,您可以使用以下命令启动作业:

 python -m tensorflow_datasets.scripts.download_and_prepare \
  --datasets=$DATASET_NAME/$DATASET_CONFIG \
  --data_dir=$GCS_BUCKET/tensorflow_datasets \
  --beam_pipeline_options=\
"runner=DataflowRunner,project=$GCP_PROJECT,job_name=$DATASET_NAME-gen,"\
"staging_location=$GCS_BUCKET/binaries,temp_location=$GCS_BUCKET/temp,"\
"requirements_file=/tmp/beam_requirements.txt"
 

在本地

要使用默认的Apache Beam运行器在本地运行脚本,该命令与其他数据集的命令相同:

 python -m tensorflow_datasets.scripts.download_and_prepare \
  --datasets=my_new_dataset
 

使用自定义脚本

要在Beam上生成数据集,API与其他数据集的API相同,但是您必须将Beam选项或流道传递给DownloadConfig

 # If you are running on Dataflow, Spark,..., you may have to set-up runtime
# flags. Otherwise, you can leave flags empty [].
flags = ['--runner=DataflowRunner', '--project=<project-name>', ...]

# To use Beam, you have to set at least one of `beam_options` or `beam_runner`
dl_config = tfds.download.DownloadConfig(
    beam_options=beam.options.pipeline_options.PipelineOptions(flags=flags)
)

data_dir = 'gs://my-gcs-bucket/tensorflow_datasets'
builder = tfds.builder('wikipedia/20190301.en', data_dir=data_dir)
builder.download_and_prepare(
    download_dir=FLAGS.download_dir,
    download_config=dl_config,
)
 

实施Beam数据集

先决条件

为了编写Apache Beam数据集,您应该熟悉以下概念:

使用说明

如果您熟悉数据集创建指南 ,则仅需进行一些修改即可添加Beam数据集:

  • 您的DatasetBuilder将继承自tfds.core.BeamBasedBuilder而不是tfds.core.GeneratorBasedBuilder
  • 梁数据集应实现抽象方法_build_pcollection(self, **kwargs)而不是方法_generate_examples(self, **kwargs)_build_pcollection应该返回beam.PCollection以及与拆分相关联的示例。
  • 为Beam数据集编写单元测试与其他数据集相同。

其他注意事项:

  • 使用tfds.core.lazy_imports导入Apache Beam。通过使用惰性依赖关系,用户仍可以在生成数据集后读取数据集,而不必安装Beam。
  • 使用Python闭包时要小心。在运行管道时,使用pickle序列化beam.Mapbeam.DoFn函数,并将其发送给所有工作人员。这会产生错误;例如,如果您在函数中使用了在函数外部声明的可变对象,则可能会遇到pickle错误或意外行为。解决方法通常是避免变异封闭的对象。
  • 在Beam管道中在DatasetBuilder上使用方法很好。但是,在腌制过程中类被序列化的方式,在创建过程中对要素所做的更改最多将被忽略。

这是Beam数据集的示例。对于更复杂的真实示例,请查看Wikipedia数据集

 class DummyBeamDataset(tfds.core.BeamBasedBuilder):

  VERSION = tfds.core.Version('1.0.0')

  def _info(self):
    return tfds.core.DatasetInfo(
        builder=self,
        features=tfds.features.FeaturesDict({
            'image': tfds.features.Image(shape=(16, 16, 1)),
            'label': tfds.features.ClassLabel(names=['dog', 'cat']),
        }),
    )

  def _split_generators(self, dl_manager):
    ...
    return [
        tfds.core.SplitGenerator(
            name=tfds.Split.TRAIN,
            gen_kwargs=dict(file_dir='path/to/train_data/'),
        ),
        splits_lib.SplitGenerator(
            name=tfds.Split.TEST,
            gen_kwargs=dict(file_dir='path/to/test_data/'),
        ),
    ]

  def _build_pcollection(self, pipeline, file_dir):
    """Generate examples as dicts."""
    beam = tfds.core.lazy_imports.apache_beam

    def _process_example(filename):
      # Use filename as key
      return filename, {
          'image': os.path.join(file_dir, filename),
          'label': filename.split('.')[1],  # Extract label: "0010102.dog.jpeg"
      }

    return (
        pipeline
        | beam.Create(tf.io.gfile.listdir(file_dir))
        | beam.Map(_process_example)
    )

 

运行管道

要运行管道,请查看以上部分。

 python -m tensorflow_datasets.scripts.download_and_prepare \
  --register_checksums \
  --datasets=my_new_dataset