分散トレーニング

分散トレーニングは、コンピューティング リソース要件 (CPU、RAM など) が複数のコンピューターに分散されるモデル トレーニングの一種です。分散トレーニングにより、より大規模なデータセット (最大数十億のサンプル) でトレーニングを行うことができます。

分散トレーニングは、複数のモデルを並行してトレーニングする自動ハイパーパラメーター最適化にも役立ちます。

このドキュメントでは、次の方法を学習します。

  • 分散トレーニングを使用して TF-DF モデルをトレーニングします。
  • 分散トレーニングを使用して TF-DF モデルのハイパーパラメーターを調整します。

制限事項

現時点では、分散トレーニングは以下に対してサポートされています。

  • tfdf.keras.DistributedGradientBoostedTreesModelを使用して勾配ブースト ツリー モデルをトレーニングします。分散勾配ブースト ツリー モデルは、非分散モデルと同等です。
  • あらゆる TF-DF モデル タイプのハイパーパラメータ検索。

分散トレーニングを有効にする方法

このセクションでは、分散トレーニングを有効にする手順を示します。完全な例については、次のセクションを参照してください。

ParameterServerStrategy スコープ

モデルとデータセットは、 ParameterServerStrategyスコープで定義されます。

strategy = tf.distribute.experimental.ParameterServerStrategy(...)
with strategy.scope():
  model = tfdf.keras.DistributedGradientBoostedTreesModel()
  distributed_train_dataset = strategy.distribute_datasets_from_function(dataset_fn)
model.fit(distributed_train_dataset)

データセット形式

非分散トレーニングと同様に、データセットは次のように提供できます。

  1. 有限テンソルフロー分散データセット、または
  2. 互換性のあるデータセット形式のいずれかを使用したデータセット ファイルへのパス。

シャード ファイルの使用は、有限テンソルフロー分散データセット アプローチを使用するよりも大幅に簡単です (1 行対約 20 行のコード)。ただし、TensorFlow の前処理をサポートするのは、tensorflow データセットのアプローチのみです。パイプラインに前処理が含まれていない場合は、シャード データセット オプションをお勧めします。

どちらの場合も、データセットの読み取りを効率的に分散するには、データセットを複数のファイルにシャード化する必要があります。

ワーカーのセットアップ

主なプロセスは、TensorFlow モデルを定義する Python コードを実行するプログラムです。このプロセスでは負荷の高い計算は実行されません。効果的なトレーニングの計算はワーカーによって行われます。ワーカーは、TensorFlow パラメータ サーバーを実行するプロセスです。

チーフにはワーカーの IP アドレスを設定する必要があります。これは、 TF_CONFIG環境変数を使用するか、 ClusterResolverを作成することで実行できます。詳細については、「ParameterServerStrategy を使用したパラメーター サーバーのトレーニング」を参照してください。

TensorFlow の ParameterServerStrategy は、「ワーカー」と「パラメータ サーバー」という 2 種類のワーカーを定義します。 TensorFlow では、各タイプのワーカーを少なくとも 1 つインスタンス化する必要があります。ただし、TF-DF では「ワーカー」のみを使用します。したがって、1 つの「パラメータ サーバー」をインスタンス化する必要がありますが、TF-DF では使用されません。たとえば、TF-DF トレーニングの構成は次のようになります。

  • 1 チーフ
  • 50人の労働者
  • 1 パラメータサーバー

ワーカーは TensorFlow Decision Forests のカスタム トレーニング オペレーションにアクセスする必要があります。アクセスを有効にするには 2 つのオプションがあります。

  1. 事前構成された TF-DF C++ パラメーター サーバー//third_party/tensorflow_decision_forests/tensorflow/distribute:tensorflow_std_serverを使用します。
  2. tf.distribute.Server()を呼び出してパラメータ サーバーを作成します。この場合、TF-DF をインポートする必要がありますimport tensorflow_decision_forests

このセクションでは、分散トレーニング構成の完全な例を示します。その他の例については、 TF-DF 単体テストを確認してください。

例: データセット パスでの分散トレーニング

互換性のあるデータセット形式のいずれかを使用して、データセットを一連のシャーディング ファイルに分割します。ファイルに次のような名前を付けることをお勧めします: /path/to/dataset/train-<5 digit index>-of-<total files> 、たとえば

/path/to/dataset/train-00000-of-00100
/path/to/dataset/train-00001-of-00005
/path/to/dataset/train-00002-of-00005
...

効率を最大限に高めるには、ファイル数はワーカー数の少なくとも 10 倍である必要があります。たとえば、100 人のワーカーでトレーニングしている場合は、データセットが少なくとも 1000 個のファイルに分割されていることを確認してください。

ファイルは、次のようなシャーディング式を使用して参照できます。

  • /path/to/dataset/train@1000
  • /path/to/dataset/train@*

分散トレーニングは次のように行われます。この例では、データセットは TensorFlow Examples (キーtfrecord+tfeで定義) の TFRecord として保存されます。

import tensorflow_decision_forests as tfdf
import tensorflow as tf

strategy = tf.distribute.experimental.ParameterServerStrategy(...)

with strategy.scope():
  model = tfdf.keras.DistributedGradientBoostedTreesModel()

model.fit_on_dataset_path(
    train_path="/path/to/dataset/train@1000",
    label_key="label_key",
    dataset_format="tfrecord+tfe")

print("Trained model")
model.summary()

例: 有限の TensorFlow 分散データセットでの分散トレーニング

TF-DF は、分散された有限のワーカーでシャーディングされた TensorFlow データセットを想定しています。

  • Distributed : 非分散データセットは、 strategy.distribute_datasets_from_functionでラップされます。
  • finite : データセットは各サンプルを 1 回だけ読み取る必要があります。データセットにはrepeat命令を含めないでください。
  • work-sharded : 各ワーカーはデータセットの個別の部分を読み取る必要があります。

以下に例を示します。

import tensorflow_decision_forests as tfdf
import tensorflow as tf


def dataset_fn(context, paths):
  """Create a worker-sharded finite dataset from paths.

  Like for non-distributed training, each example should be visited exactly
  once (and by only one worker) during the training. In addition, for optimal
  training speed, the reading of the examples should be distributed among the
  workers (instead of being read by a single worker, or read and discarded
  multiple times).

  In other words, don't add a "repeat" statement and make sure to shard the
  dataset at the file level and not at the example level.
  """

  # List the dataset files
  ds_path = tf.data.Dataset.from_tensor_slices(paths)

  # Make sure the dataset is used with distributed training.
  assert context is not None


  # Split the among the workers.
  #
  # Note: The "shard" is applied on the file path. The shard should not be
  # applied on the examples directly.
  # Note: You cannot use 'context.num_input_pipelines' with ParameterServerV2.
  current_worker = tfdf.keras.get_worker_idx_and_num_workers(context)
  ds_path = ds_path.shard(
      num_shards=current_worker.num_workers,
      index=current_worker.worker_idx)

  def read_csv_file(path):
    """Reads a single csv file."""

    numerical = tf.constant([0.0], dtype=tf.float32)
    categorical_string = tf.constant(["NA"], dtype=tf.string)
    csv_columns = [
        numerical,  # feature 1
        categorical_string,  # feature 2
        numerical,  # feature 3
        # ... define the features here.
    ]
    return tf.data.experimental.CsvDataset(path, csv_columns, header=True)

  ds_columns = ds_path.interleave(read_csv_file)

  # We assume a binary classification label with the following possible values.
  label_values = ["<=50K", ">50K"]

  # Convert the text labels into integers:
  # "<=50K" => 0
  # ">50K" => 1
  init_label_table = tf.lookup.KeyValueTensorInitializer(
      keys=tf.constant(label_values),
      values=tf.constant(range(label_values), dtype=tf.int64))
  label_table = tf.lookup.StaticVocabularyTable(
      init_label_table, num_oov_buckets=1)

  def extract_label(*columns):
    return columns[0:-1], label_table.lookup(columns[-1])

  ds_dataset = ds_columns.map(extract_label)

  # The batch size has no impact on the quality of the model. However, a larger
  # batch size generally is faster.
  ds_dataset = ds_dataset.batch(500)
  return ds_dataset


strategy = tf.distribute.experimental.ParameterServerStrategy(...)
with strategy.scope():
  model = tfdf.keras.DistributedGradientBoostedTreesModel()

  train_dataset = strategy.distribute_datasets_from_function(
      lambda context: dataset_fn(context, [...list of csv files...])
  )

model.fit(train_dataset)

print("Trained model")
model.summary()

例: データセット パスでの分散ハイパーパラメータ調整

データセット パスでの分散ハイパーパラメータ調整は、分散トレーニングに似ています。唯一の違いは、このオプションが非配布モデルと互換性があることです。たとえば、(非分散) Gradient Boosted Trees モデルのハイパーパラメータ調整を分散できます。

with strategy.scope():
  tuner = tfdf.tuner.RandomSearch(num_trials=30, use_predefined_hps=True)
  model = tfdf.keras.GradientBoostedTreesModel(tuner=tuner)

training_history = model.fit_on_dataset_path(
  train_path=train_path,
  label_key=label,
  dataset_format="csv",
  valid_path=test_path)

logging.info("Trained model:")
model.summary()

例: 単体テスト

分散トレーニングを単体テストするために、モック ワーカー プロセスを作成できます。詳細については、TF-DF 単体テストのメソッド_create_in_process_tf_ps_clusterを参照してください。