新しい!スプレッドシート用のシンプルな ML を使用して、Google スプレッドシートのデータに機械学習を適用します続きを読む

分散トレーニング

コレクションでコンテンツを整理 必要に応じて、コンテンツの保存と分類を行います。

分散トレーニングは、コンピューティング リソース要件 (CPU、RAM など) が複数のコンピューターに分散されるモデル トレーニングの一種です。分散トレーニングにより、より高速に、より大きなデータセット (最大数十億の例) でトレーニングできます。

分散トレーニングは、複数のモデルが並行してトレーニングされる自動ハイパーパラメーター最適化にも役立ちます。

このドキュメントでは、次の方法について学習します。

  • 分散トレーニングを使用してモデルをトレーニングします。
  • 分散トレーニングを使用してモデルのハイパーパラメーターを調整します。

制限事項

現在、分散トレーニングは以下によってサポートされています。

  • 分散型勾配ブースティング ツリー モデル。このモデルは、非分散勾配ブースト ツリー モデルと同等です。
  • 自動化されたハイパーパラメーター チューナーを使用する任意のモデル

分散トレーニングを有効にする方法

このセクションでは、分散トレーニングを有効にする手順を示します。完全な例については、次のセクションを参照してください。

ParameterServerStrategy スコープ

モデルとデータセットはParameterServerStrategyスコープで定義されます。

strategy = tf.distribute.experimental.ParameterServerStrategy(...)
with strategy.scope():
  model = tfdf.keras.DistributedGradientBoostedTreesModel()
  distributed_train_dataset = strategy.distribute_datasets_from_function(dataset_fn)
model.fit(distributed_train_dataset)

データセット形式

非分散型トレーニングと同様に、データセットは次のように提供できます。

  1. 有限テンソルフロー分散データセット、または
  2. 互換性のあるデータセット形式のいずれかを使用したデータセット ファイルへのパス。

シャード ファイルの使用は、有限テンソルフロー分散データセット アプローチを使用するよりも大幅に簡単です (1 行対 ~20 行のコード)。ただし、TensorFlow データセット アプローチのみが TensorFlow 前処理をサポートしています。パイプラインに前処理が含まれていない場合は、分割されたデータセット オプションをお勧めします。

どちらの場合も、データセットの読み取りを効率的に分散するために、データセットを複数のファイルに分割する必要があります。

ワーカーのセットアップ

主なプロセスは、TensorFlow モデルを定義する Python コードを実行するプログラムです。このプロセスは重い計算を実行していません。効果的なトレーニング計算はworkerによって行われます。ワーカーは、TensorFlow パラメータ サーバーを実行するプロセスです。

チーフは、ワーカーの IP アドレスで構成する必要があります。これは、 TF_CONFIG環境変数を使用するか、 ClusterResolverを作成することで実行できます。詳細については、ParameterServerStrategy を使用したパラメーター サーバーのトレーニングを参照してください。

TensorFlow の ParameterServerStrategy は、「ワーカー」と「パラメーター サーバー」という 2 つのタイプのワーカーを定義します。 TensorFlow では、各タイプのワーカーを少なくとも 1 つインスタンス化する必要があります。 TF-DF は「ワーカー」のみを使用します。そのため、1 つの「パラメーター サーバー」をインスタンス化する必要がありますが、TF-DF では使用されません。たとえば、TF-DF トレーニングの構成は次のようになります。

  • 1 チーフ
  • 50人の労働者
  • 1 パラメータサーバー

ワーカーは、TensorFlow デシジョン フォレストのカスタム トレーニング ops にアクセスできます。そのためには、次の 2 つのオプションがあります。

  1. 事前構成された TF-DF C++ パラメータ サーバー//third_party/tensorflow_decision_forests/tensorflow/distribute:tensorflow_std_serverを使用します。
  2. tf.distribute.Server()を呼び出してパラメーター サーバーを作成します。この場合、TF-DF をimport tensorflow_decision_forestsにインポートする必要があります。

このセクションでは、分散トレーニング構成の完全な例を示します。その他の例については、 TF-DF 単体テストを確認してください。

例: データセット パスでの分散トレーニング

互換性のあるデータセット形式のいずれかを使用して、データセットをシャード ファイルのセットに分割します。次のようにファイルに名前を付けることが推奨されます: /path/to/dataset/train-<5 digit index>-of-<total files> .

例えば:

/path/to/dataset/train-00000-of-00100
/path/to/dataset/train-00001-of-00005
/path/to/dataset/train-00002-of-00005
...

効率を最大にするには、ファイル数を少なくともワーカー数の 10 倍にする必要があります。たとえば、100 人のワーカーでトレーニングしている場合は、データセットが少なくとも 1000 個のファイルに分割されていることを確認してください。

ファイルは、次のようなシャーディング式で参照できます。

  • /path/to/dataset/train@1000
  • /path/to/dataset/train@*

分散トレーニングは次のように行われます。この例では、データセットは TensorFlow Examples の TFRecord として保存されます (キーtfrecord+tfeによって定義されます)。

import tensorflow_decision_forests as tfdf
import tensorflow as tf

strategy = tf.distribute.experimental.ParameterServerStrategy(...)

with strategy.scope():
  model = tfdf.keras.DistributedGradientBoostedTreesModel()

model.fit_on_dataset_path(
    train_path="/path/to/dataset/train@1000",
    label_key="label_key",
    dataset_format="tfrecord+tfe")

print("Trained model")
model.summary()

例: 有限の TensorFlow 分散データセットでの分散トレーニング

TF-DF は、分散された有限のワーカー シャード TensorFlow データセットを想定しています。

  • Distributed : 非分散データセットはstrategy.distribute_datasets_from_functionにラップされます。
  • 有限: データセットは例を 1 回だけ読み取る必要があります。データセットには、 repeatの指示を含めないでください。
  • worker-sharded : 各ワーカーは、データセットの個別の部分を読み取る必要があります。

次に例を示します。

import tensorflow_decision_forests as tfdf
import tensorflow as tf


def dataset_fn(context, paths):
  """Create a worker-sharded finite dataset from paths.

  Like for non-distributed training, each example should be visited exactly
  once (and by only one worker) during the training. In addition, for optimal
  training speed, the reading of the examples should be distributed among the
  workers (instead of being read by a single worker, or read and discarded
  multiple times).

  In other words, don't add a "repeat" statement and make sure to shard the
  dataset at the file level and not at the example level.
  """

  # List the dataset files
  ds_path = tf.data.Dataset.from_tensor_slices(paths)

  # Make sure the dataset is used with distributed training.
  assert context is not None


  # Split the among the workers.
  #
  # Note: The "shard" is applied on the file path. The shard should not be
  # applied on the examples directly.
  # Note: You cannot use 'context.num_input_pipelines' with ParameterServerV2.
  current_worker = tfdf.keras.get_worker_idx_and_num_workers(context)
  ds_path = ds_path.shard(
      num_shards=current_worker.num_workers,
      index=current_worker.worker_idx)

  def read_csv_file(path):
    """Reads a single csv file."""

    numerical = tf.constant([0.0], dtype=tf.float32)
    categorical_string = tf.constant(["NA"], dtype=tf.string)
    csv_columns = [
        numerical,  # feature 1
        categorical_string,  # feature 2
        numerical,  # feature 3
        # ... define the features here.
    ]
    return tf.data.experimental.CsvDataset(path, csv_columns, header=True)

  ds_columns = ds_path.interleave(read_csv_file)

  # We assume a binary classification label with the following possible values.
  label_values = ["<=50K", ">50K"]

  # Convert the text labels into integers:
  # "<=50K" => 0
  # ">50K" => 1
  init_label_table = tf.lookup.KeyValueTensorInitializer(
      keys=tf.constant(label_values),
      values=tf.constant(range(label_values), dtype=tf.int64))
  label_table = tf.lookup.StaticVocabularyTable(
      init_label_table, num_oov_buckets=1)

  def extract_label(*columns):
    return columns[0:-1], label_table.lookup(columns[-1])

  ds_dataset = ds_columns.map(extract_label)

  # The batch size has no impact on the quality of the model. However, a larger
  # batch size generally is faster.
  ds_dataset = ds_dataset.batch(500)
  return ds_dataset


strategy = tf.distribute.experimental.ParameterServerStrategy(...)
with strategy.scope():
  model = tfdf.keras.DistributedGradientBoostedTreesModel()

  train_dataset = strategy.distribute_datasets_from_function(
      lambda context: dataset_fn(context, [...list of csv files...])
  )

model.fit(train_dataset)

print("Trained model")
model.summary()

例: データセット パスでの分散ハイパーパラメーター調整

データセット パスでの分散ハイパーパラメーター調整は、分散トレーニングに似ています。唯一の違いは、このオプションが非分散モデルと互換性があることです。たとえば、(非分散) 勾配ブースティング ツリー モデルのハイパーパラメーター調整を分散できます。

with strategy.scope():
  tuner = tfdf.tuner.RandomSearch(num_trials=30, use_predefined_hps=True)
  model = tfdf.keras.GradientBoostedTreesModel(tuner=tuner)

training_history = model.fit_on_dataset_path(
  train_path=train_path,
  label_key=label,
  dataset_format="csv",
  valid_path=test_path)

logging.info("Trained model:")
model.summary()

例: 単体テスト

分散トレーニングの単体テストを行うために、モック ワーカー プロセスを作成できます。 TF-DF 単体テストのメソッド_create_in_process_tf_ps_clusterを参照してください