このページは Cloud Translation API によって翻訳されました。
Switch to English

Apache Beamで大きなデータセットを生成する

一部のデータセットは、1台のマシンで処理するには大きすぎます。 tfdsは、 Apache Beamを使用して多くのマシン間でデータを生成することをサポートしています。

このドキュメントには2つのセクションがあります。

  • 既存のBeamデータセットを生成したいユーザー向け
  • 新しいBeamデータセットを作成したい開発者向け

目次:

Beamデータセットを生成する

以下は、クラウド上またはローカルの両方でBeamデータセットを生成するさまざまな例です。

Google Cloud Dataflow

Google Cloud Dataflowを使用してパイプラインを実行し、分散計算を利用するには、まずクイックスタートの手順に従います。

環境をセットアップしたら、 GCSのデータディレクトリを使用してdownload_and_prepareスクリプトを実行し、 --beam_pipeline_optionsフラグに必要なオプションを指定できます

スクリプトを簡単に起動できるようにするには、GCP / GCS設定の実際の値と生成するデータセットを使用して、次の変数を定義すると便利です。

 DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
GCP_PROJECT=my-project-id
GCS_BUCKET=gs://my-gcs-bucket
 

次に、Dataflowにtfdsをワーカーにインストールするように指示するファイルを作成する必要があります。

 echo "tensorflow_datasets[$DATASET_NAME]" > /tmp/beam_requirements.txt
 

tfds-nightlyを使用している場合は、前回のリリース以降にデータセットが更新されている場合に備えて、必ずtfds-nightlyからエコーしてtfds-nightly

 echo "tfds-nightly[$DATASET_NAME]" > /tmp/beam_requirements.txt
 

最後に、以下のコマンドを使用してジョブを起動できます。

 python -m tensorflow_datasets.scripts.download_and_prepare \
  --datasets=$DATASET_NAME/$DATASET_CONFIG \
  --data_dir=$GCS_BUCKET/tensorflow_datasets \
  --beam_pipeline_options=\
"runner=DataflowRunner,project=$GCP_PROJECT,job_name=$DATASET_NAME-gen,"\
"staging_location=$GCS_BUCKET/binaries,temp_location=$GCS_BUCKET/temp,"\
"requirements_file=/tmp/beam_requirements.txt"
 

ローカルで

デフォルトのApache Beamランナーを使用してスクリプトをローカルで実行するためのコマンドは、他のデータセットの場合と同じです。

 python -m tensorflow_datasets.scripts.download_and_prepare \
  --datasets=my_new_dataset
 

カスタムスクリプトを使用

Beamでデータセットを生成するには、APIは他のデータセットと同じですが、BeamオプションまたはランナーをDownloadConfigに渡す必要があります。

 # If you are running on Dataflow, Spark,..., you may have to set-up runtime
# flags. Otherwise, you can leave flags empty [].
flags = ['--runner=DataflowRunner', '--project=<project-name>', ...]

# To use Beam, you have to set at least one of `beam_options` or `beam_runner`
dl_config = tfds.download.DownloadConfig(
    beam_options=beam.options.pipeline_options.PipelineOptions(flags=flags)
)

data_dir = 'gs://my-gcs-bucket/tensorflow_datasets'
builder = tfds.builder('wikipedia/20190301.en', data_dir=data_dir)
builder.download_and_prepare(
    download_dir=FLAGS.download_dir,
    download_config=dl_config,
)
 

Beamデータセットの実装

前提条件

Apache Beamデータセットを作成するには、次の概念に精通している必要があります。

指示

データセット作成ガイドに精通している場合、Beamデータセットを追加するには、いくつかの変更のみが必要です。

  • DatasetBuilderは、 tfds.core.BeamBasedBuilderではなくtfds.core.GeneratorBasedBuilderから継承します。
  • ビームデータセットは、抽象メソッドを実装する必要があり_build_pcollection(self, **kwargs)代わりに、メソッドの_generate_examples(self, **kwargs)_build_pcollectionは、スプリットに関連付けられた例をbeam.PCollection _build_pcollectionを返す必要があります。
  • Beamデータセットの単体テストの記述は、他のデータセットの場合と同じです。

その他の考慮事項:

  • Apache Beamをインポートするには、 tfds.core.lazy_importsを使用します。遅延依存関係を使用することにより、ユーザーはBeamをインストールしなくても、生成されたデータセットを読み取ることができます。
  • Pythonクロージャには注意してください。パイプラインを実行している場合、 beam.Mapbeam.DoFn機能を使用して直列化されるpickleし、すべての労働者に送られます。これによりバグが発生する可能性があります。たとえば、関数の外部で宣言されている可変オブジェクトを関数で使用している場合、 pickleまたは予期しない動作が発生する可能性があります。修正は通常、閉じたオブジェクトの変更を回避することです。
  • BeamパイプラインでDatasetBuilderメソッドを使用することはDatasetBuilder 。ただし、pickle中にクラスがシリアル化される方法、作成中に機能に加えられた変更は無視されます。

以下は、ビームデータセットの例です。より複雑な実際の例については、 Wikipediaデータセットをご覧ください。

 class DummyBeamDataset(tfds.core.BeamBasedBuilder):

  VERSION = tfds.core.Version('1.0.0')

  def _info(self):
    return tfds.core.DatasetInfo(
        builder=self,
        features=tfds.features.FeaturesDict({
            'image': tfds.features.Image(shape=(16, 16, 1)),
            'label': tfds.features.ClassLabel(names=['dog', 'cat']),
        }),
    )

  def _split_generators(self, dl_manager):
    ...
    return [
        tfds.core.SplitGenerator(
            name=tfds.Split.TRAIN,
            gen_kwargs=dict(file_dir='path/to/train_data/'),
        ),
        splits_lib.SplitGenerator(
            name=tfds.Split.TEST,
            gen_kwargs=dict(file_dir='path/to/test_data/'),
        ),
    ]

  def _build_pcollection(self, pipeline, file_dir):
    """Generate examples as dicts."""
    beam = tfds.core.lazy_imports.apache_beam

    def _process_example(filename):
      # Use filename as key
      return filename, {
          'image': os.path.join(file_dir, filename),
          'label': filename.split('.')[1],  # Extract label: "0010102.dog.jpeg"
      }

    return (
        pipeline
        | beam.Create(tf.io.gfile.listdir(file_dir))
        | beam.Map(_process_example)
    )

 

パイプラインを実行する

パイプラインを実行するには、上記のセクションをご覧ください。

 python -m tensorflow_datasets.scripts.download_and_prepare \
  --register_checksums \
  --datasets=my_new_dataset