本頁面由 Cloud Translation API 翻譯而成。
Switch to English

使用Apache Beam生成大型數據集

有些數據集太大,無法在一台計算機上進行處理。 tfds支持使用Apache Beam在許多機器上生成數據。

該文檔分為兩部分:

  • 對於想要生成現有Beam數據集的用戶
  • 對於想要創建新的Beam數據集的開發人員

表中的內容:

生成Beam數據集

以下是在雲上或本地生成Beam數據集的不同示例。

在Google Cloud Dataflow上

要使用Google Cloud Dataflow運行管道並利用分佈式計算的優勢,請首先遵循快速入門說明

設置好環境後,您可以使用GCS上的數據目錄並為--beam_pipeline_options標誌指定所需的選項來運行download_and_prepare腳本。

為了使啟動腳本更容易,使用GCP / GCS設置和要生成的數據集的實際值定義以下變量會很有幫助:

 DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
GCP_PROJECT=my-project-id
GCS_BUCKET=gs://my-gcs-bucket
 

然後,您需要創建一個文件來告訴Dataflow在工作tfds上安裝tfds

 echo "tensorflow_datasets[$DATASET_NAME]" > /tmp/beam_requirements.txt
 

如果您使用的是tfds-nightly ,請確保從tfds-nightlytfds-nightly ,以防自上次發行以來數據集已更新。

 echo "tfds-nightly[$DATASET_NAME]" > /tmp/beam_requirements.txt
 

最後,您可以使用以下命令啟動作業:

 python -m tensorflow_datasets.scripts.download_and_prepare \
  --datasets=$DATASET_NAME/$DATASET_CONFIG \
  --data_dir=$GCS_BUCKET/tensorflow_datasets \
  --beam_pipeline_options=\
"runner=DataflowRunner,project=$GCP_PROJECT,job_name=$DATASET_NAME-gen,"\
"staging_location=$GCS_BUCKET/binaries,temp_location=$GCS_BUCKET/temp,"\
"requirements_file=/tmp/beam_requirements.txt"
 

在本地

要使用默認的Apache Beam運行器在本地運行腳本,該命令與其他數據集的命令相同:

 python -m tensorflow_datasets.scripts.download_and_prepare \
  --datasets=my_new_dataset
 

使用自定義腳本

要在Beam上生成數據集,API與其他數據集的API相同,但是您必須將Beam選項或流道傳遞給DownloadConfig

 # If you are running on Dataflow, Spark,..., you may have to set-up runtime
# flags. Otherwise, you can leave flags empty [].
flags = ['--runner=DataflowRunner', '--project=<project-name>', ...]

# To use Beam, you have to set at least one of `beam_options` or `beam_runner`
dl_config = tfds.download.DownloadConfig(
    beam_options=beam.options.pipeline_options.PipelineOptions(flags=flags)
)

data_dir = 'gs://my-gcs-bucket/tensorflow_datasets'
builder = tfds.builder('wikipedia/20190301.en', data_dir=data_dir)
builder.download_and_prepare(
    download_dir=FLAGS.download_dir,
    download_config=dl_config,
)
 

實施Beam數據集

先決條件

為了編寫Apache Beam數據集,您應該熟悉以下概念:

使用說明

如果您熟悉數據集創建指南 ,那麼添加Beam數據集只需進行一些修改:

  • 您的DatasetBuilder將繼承自tfds.core.BeamBasedBuilder而不是tfds.core.GeneratorBasedBuilder
  • 梁數據集應實現抽象方法_build_pcollection(self, **kwargs)而不是方法_generate_examples(self, **kwargs)_build_pcollection應該返回beam.PCollection以及與拆分相關聯的示例。
  • 為Beam數據集編寫單元測試與其他數據集相同。

其他注意事項:

  • 使用tfds.core.lazy_imports導入Apache Beam。通過使用惰性依賴關係,用戶仍可以在生成數據集後讀取數據集,而不必安裝Beam。
  • 使用Python閉包時要小心。運行管道時,使用pickle序列化beam.Mapbeam.DoFn函數,並將其發送給所有工作人員。這會產生錯誤;例如,如果您在函數中使用了在函數外部聲明的可變對象,則可能會遇到pickle錯誤或意外行為。解決方法通常是避免更改封閉的對象。
  • 在Beam管道中在DatasetBuilder上使用方法很好。但是,在醃製過程中類被序列化的方式,在創建過程中對要素所做的更改最多將被忽略。

這是Beam數據集的示例。對於更複雜的真實示例,請查看Wikipedia數據集

 class DummyBeamDataset(tfds.core.BeamBasedBuilder):

  VERSION = tfds.core.Version('1.0.0')

  def _info(self):
    return tfds.core.DatasetInfo(
        builder=self,
        features=tfds.features.FeaturesDict({
            'image': tfds.features.Image(shape=(16, 16, 1)),
            'label': tfds.features.ClassLabel(names=['dog', 'cat']),
        }),
    )

  def _split_generators(self, dl_manager):
    ...
    return [
        tfds.core.SplitGenerator(
            name=tfds.Split.TRAIN,
            gen_kwargs=dict(file_dir='path/to/train_data/'),
        ),
        splits_lib.SplitGenerator(
            name=tfds.Split.TEST,
            gen_kwargs=dict(file_dir='path/to/test_data/'),
        ),
    ]

  def _build_pcollection(self, pipeline, file_dir):
    """Generate examples as dicts."""
    beam = tfds.core.lazy_imports.apache_beam

    def _process_example(filename):
      # Use filename as key
      return filename, {
          'image': os.path.join(file_dir, filename),
          'label': filename.split('.')[1],  # Extract label: "0010102.dog.jpeg"
      }

    return (
        pipeline
        | beam.Create(tf.io.gfile.listdir(file_dir))
        | beam.Map(_process_example)
    )

 

運行管道

要運行管道,請查看以上部分。

 python -m tensorflow_datasets.scripts.download_and_prepare \
  --register_checksums \
  --datasets=my_new_dataset