Google I / Oの基調講演、製品セッション、ワークショップなどを見るプレイリストを見る

Apache Beam でビッグデータセットを生成する

データセットによっては、1 台のマシンで処理するには大きすぎるものがあります。tfdsは、Apache Beam を使用することによって、多くのマシンにまたがったデータ生成のサポートをします。

このドキュメントには、2 つのセクションがあります。

  • 既存の Beam のデータセットを生成するユーザー向け
  • 新規の Beam のデータセットを作成する開発者向け

目次 :

Beam のデータセットを生成する

クラウドとローカルで Beam のデータセットを生成する例を以下に紹介します。

警告: tensorflow_datasets.scripts.download_and_prepareスクリプトでデータセットを生成する際には、生成するデータセットの設定を必ず指定しなければなりません。例えば、wikipedia の場合は、--dataset=wikipediaの代わりに--dataset=wikipedia/20200301.enを使用します。

Google Cloud Dataflow で生成する

Google Cloud Dataflow を使用してパイプラインを実行し、分散計算を活用できるようにするには、まずクイックスタートの手順に従います。

環境の設定ができたら、GCS のデータディレクトリを使用してdownload_and_prepareスクリプトを実行し、--beam_pipeline_optionsフラグに必要なオプションを指定します。

スクリプトの起動を容易にするためには、GCP/GCS のセットアップと生成するデータセットの実際の値を使用して、以下の変数を定義すると有用です。

DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
GCP_PROJECT=my-project-id
GCS_BUCKET=gs://my-gcs-bucket

次に、ワーカーにtfdsをインストールするよう Dataflow に指示をするファイルを作成する必要があります。

echo "tensorflow_datasets[$DATASET_NAME]" > /tmp/beam_requirements.txt

tfds-nightlyを使用している場合には、データセットが前回のリリースから更新されている場合に備え、tfds-nightlyからエコーするようにします。

echo "tfds-nightly[$DATASET_NAME]" > /tmp/beam_requirements.txt

最後に、以下のコマンドを使用してジョブを起動します。

python -m tensorflow_datasets.scripts.download_and_prepare \
  --datasets=$DATASET_NAME/$DATASET_CONFIG \
  --data_dir=$GCS_BUCKET/tensorflow_datasets \
  --beam_pipeline_options=\
"runner=DataflowRunner,project=$GCP_PROJECT,job_name=$DATASET_NAME-gen,"\
"staging_location=$GCS_BUCKET/binaries,temp_location=$GCS_BUCKET/temp,"\
"requirements_file=/tmp/beam_requirements.txt"

ローカルで生成する

デフォルトの Apache Beam ランナーを使用してローカルでスクリプトを実行する場合、コマンドは他のデータセットの場合と同じです。

python -m tensorflow_datasets.scripts.download_and_prepare \
  --datasets=my_new_dataset

警告: Beam のデータセットは巨大(テラバイト級)な場合があり、生成にはかなりの量のリソースを必要とします(ローカルコンピュータでは数週間かかることもあります)。データセットの生成には分散環境の使用を推奨しています。サポートされているランタイムのリストについては Apache Beam ドキュメントを参照してください。

カスタムスクリプト内で生成する

Beam でデータセットを生成する場合、API は他のデータセットの場合と同じですが、Beam のオプションかランナーをDownloadConfigに渡す必要があります。

# If you are running on Dataflow, Spark,..., you may have to set-up runtime
# flags. Otherwise, you can leave flags empty [].
flags = ['--runner=DataflowRunner', '--project=<project-name>', ...]

# To use Beam, you have to set at least one of `beam_options` or `beam_runner`
dl_config = tfds.download.DownloadConfig(
    beam_options=beam.options.pipeline_options.PipelineOptions(flags=flags)
)

data_dir = 'gs://my-gcs-bucket/tensorflow_datasets'
builder = tfds.builder('wikipedia/20190301.en', data_dir=data_dir)
builder.download_and_prepare(
    download_dir=FLAGS.download_dir,
    download_config=dl_config,
)

Beam のデータセットを実装する

必要な準備

Apache Beam のデータセットを書き込むにあたり、以下の概念を理解しておく必要があります。

手順

データセット作成ガイドに精通している場合は、少し変更を加えるだけで Beam のデータセットを追加することができます。

  • DatasetBuilderは、tfds.core.GeneratorBasedBuilderの代わりにtfds.core.BeamBasedBuilderを継承します。
  • Beam のデータセットは、メソッド_generate_examples(self, **kwargs)の代わりに抽象メソッド_build_pcollection(self, **kwargs)を実装します。_build_pcollectionは、スプリットに関連付けられた例を含むbeam.PCollectionを返します。
  • Beam のデータセットの単体テストの記述は、他のデータセットの場合と同じです。

その他の考慮事項 :

  • Apache Beam のインポートには、tfds.core.lazy_importsを使用します。遅延依存関係を使用すると、ユーザーは Beam をインストールしなくても、生成された後のデータセットを読むことができます。
  • Python のクロージャには注意が必要です。パイプラインを実行すると、beam.Map関数とbeam.DoFn関数はpickleを使用してシリアライズされ、すべてのワーカーに送信されます。これはバグを発生させる可能性があります。例えば、関数の外部で宣言された可変オブジェクトを関数内で使用している場合、pickleエラーや予期せぬ動作が発生する場合があります。一般的な解決策は、関数閉包を変更しないようにすることです。
  • Beam のパイプラインにDatasetBuilderのメソッドを使用することは問題ありません。しかし、pickle 中にクラスをシリアライズする方法では、作成中に加えられた特徴の変更は無視されます。

以下は Beam のデータセットの例です。より複雑な実例については、Wikipediaデータセットをご覧ください。

class DummyBeamDataset(tfds.core.BeamBasedBuilder):

  VERSION = tfds.core.Version('1.0.0')

  def _info(self):
    return tfds.core.DatasetInfo(
        builder=self,
        features=tfds.features.FeaturesDict({
            'image': tfds.features.Image(shape=(16, 16, 1)),
            'label': tfds.features.ClassLabel(names=['dog', 'cat']),
        }),
    )

  def _split_generators(self, dl_manager):
    ...
    return [
        tfds.core.SplitGenerator(
            name=tfds.Split.TRAIN,
            gen_kwargs=dict(file_dir='path/to/train_data/'),
        ),
        splits_lib.SplitGenerator(
            name=tfds.Split.TEST,
            gen_kwargs=dict(file_dir='path/to/test_data/'),
        ),
    ]

  def _build_pcollection(self, pipeline, file_dir):
    """Generate examples as dicts."""
    beam = tfds.core.lazy_imports.apache_beam

    def _process_example(filename):
      # Use filename as key
      return filename, {
          'image': os.path.join(file_dir, filename),
          'label': filename.split('.')[1],  # Extract label: "0010102.dog.jpeg"
      }

    return (
        pipeline
        | beam.Create(tf.io.gfile.listdir(file_dir))
        | beam.Map(_process_example)
    )

パイプラインの実行

パイプラインの実行には、上記のセクションをご覧ください。

警告: 初めてデータセットを実行してダウンロードを登録する際には、download_and_prepareスクリプトにレジスタチェックサム--register_checksumsフラグの追加を忘れないようにしてください。

python -m tensorflow_datasets.scripts.download_and_prepare \
  --register_checksums \
  --datasets=my_new_dataset