日付を保存! Google I / Oが5月18日から20日に戻ってきます今すぐ登録
このページは Cloud Translation API によって翻訳されました。
Switch to English

パラメータサーバートレーニング

TensorFlow.orgで表示 GoogleColabで実行 GitHubでソースを表示ノートブックをダウンロードする

概要概要

パラメータサーバートレーニングは、複数のマシンでモデルトレーニングをスケールアップするための一般的なデータ並列方法です。パラメーターサーバートレーニングクラスターは、ワーカーとパラメーターサーバーで構成されます。変数はパラメーターサーバー上に作成され、各ステップでワーカーによって読み取られて更新されます。デフォルトでは、ワーカーはこれらの変数を相互に同期せずに個別に読み取り、更新します。これが、パラメータサーバースタイルのトレーニングが非同期トレーニングと呼ばれることがある理由です。

TF2では、パラメーターサーバーのトレーニングはtf.distribute.experimental.ParameterServerStrategyクラスによって強化されます。 tf.distribute.experimental.ParameterServerStrategyクラスは、トレーニングステップを数千人のワーカー(パラメーターサーバーを伴う)にスケールアップするクラスターに分散します。サポートされている主なトレーニングAPIには、 Model.fitとも呼ばれるKeras TrainingAPIとCustomTraining Loop(CTL)の2つがあります。 Model.fitは、ユーザーがトレーニングの高レベルの抽象化と処理を好む場合に推奨されますが、CTLは、ユーザーがトレーニングループの詳細を定義することを好む場合に推奨されます。

選択したAPIに関係なく、TF2の分散トレーニングには、複数の「ジョブ」を含む「クラスター」が含まれ、各ジョブには1つ以上の「タスク」が含まれる場合があります。パラメータサーバートレーニングを使用する場合は、1つのコーディネータージョブ(ジョブ名chief )、複数のワーカージョブ(ジョブ名worker )、および複数のパラメーターサーバージョブ(ジョブ名ps )をpsすることをお勧めします。

コーディネーターがリソースを作成し、トレーニングタスクをディスパッチし、チェックポイントを書き込み、タスクの失敗に対処している間、ワーカーとパラメーターサーバーはコーディネーターからの要求をリッスンするtf.distribute.Serverを実行します。

Model.fit使用したパラメーターサーバーのトレーニング

Model.fit APIを使用したパラメーターサーバーのトレーニングでは、コーディネーターがtf.distribute.experimental.ParameterServerStrategyオブジェクトとtf.keras.utils.experimental.DatasetCreatorを入力として使用する必要があります。ストラテジーなしまたは他のストラテジーありのModel.fit使用法と同様に、ワークフローには、モデルの作成とコンパイル、コールバックの準備、それに続くModel.fit呼び出しが含まれます。

カスタムトレーニングループ(CTL)APIを使用したパラメーターサーバートレーニング

CTLの場合、 tf.distribute.experimental.coordinator.ClusterCoordinatorクラスは、コーディネーターに使用される主要なコンポーネントです。 ClusterCoordinatorクラスは、 tf.distribute.Strategyオブジェクトと連携してtf.distribute.Strategyするtf.distribute.Strategyます。このtf.distribute.Strategyオブジェクトは、クラスターの情報を提供するために必要であり、 MirroredStrategyしたカスタムトレーニングで見たようにトレーニングステップを定義するために使用されます。次に、 ClusterCoordinatorオブジェクトは、これらのトレーニングステップの実行をリモートワーカーにディスパッチします。パラメーターサーバーのトレーニングでは、 ClusterCoordinatortf.distribute.experimental.ParameterServerStrategyする必要があります。

ClusterCoordinatorオブジェクトによって提供される最も重要なAPIはscheduleです。 schedule APIはtf.functionをキューにtf.function 、futureのようなRemoteValueすぐに返します。キューに入れられた関数は、バックグラウンドスレッドのリモートワーカーにディスパッチされ、それらのRemoteValueは非同期で埋められます。 scheduleはワーカーの割り当てを必要としないため、渡されたtf.functionは使用可能な任意のワーカーで実行できます。実行されたワーカーが完了する前に使用できなくなった場合、関数は別の使用可能なワーカーで再試行されます。この事実と関数の実行がアトミックではないという事実のために、関数は複数回実行される可能性があります。

ClusterCoordinatorは、リモート関数のディスパッチに加えて、すべてのワーカーでデータセットを作成し、ワーカーが障害から回復したときにこれらのデータセットを再構築するのにも役立ちます。

チュートリアルのセットアップ

チュートリアルはCTLパスまたはModel.fitパスに分岐し、ニーズに合ったパスを選択できます。 「Xを使用したトレーニング」以外のセクションは、両方のパスに適用できます。

pip install -q portpicker
pip install -q tf-nightly
import multiprocessing
import os
import random
import portpicker
import tensorflow as tf
import tensorflow.keras as keras
import tensorflow.keras.layers.experimental.preprocessing as kpl

クラスターのセットアップ

上記のように、パラメーターサーバートレーニングクラスターには、トレーニングプログラムを実行するコーディネータータスク、TensorFlowサーバーを実行する1つまたは複数のワーカーとパラメーターサーバータスク、つまりtf.distribute.Server 、および場合によってはtf.distribute.Serverを実行する追加の評価タスクが必要です。評価(以下のサイドカー評価セクションを参照)。それらを設定するための要件は次のとおりです。

  • コーディネータータスクは、エバリュエーターを除く他のすべてのTensorFlowサーバーのアドレスとポートを知る必要があります。
  • ワーカーとパラメーターサーバーは、リッスンする必要のあるポートを知る必要があります。簡単にするために、これらのタスクでTensorFlowサーバーを作成するときは、通常、完全なクラスター情報を渡します。
  • 評価者タスクは、トレーニングクラスターの設定を知る必要はありません。含まれている場合は、トレーニングクラスターへの接続を試行しないでください。
  • ワーカーとパラメーターサーバーには、それぞれ「worker」と「ps」のタスクタイプが必要です。コーディネーターは、従来の理由から、タスクタイプとして「チーフ」を使用する必要があります。

このチュートリアルでは、パラメーターサーバーのトレーニング全体をcolabで実行できるように、インプロセスクラスターを作成します。実際のクラスターを設定する方法については、後のセクションで紹介します。

インプロセスクラスター

このチュートリアルでは、事前に多数のTensorFlowサーバーを起動し、後でそれらに接続します。これはこのチュートリアルのデモンストレーションのみを目的としていることに注意してください。実際のトレーニングでは、サーバーはワーカーマシンとpsマシンで起動されます。

def create_in_process_cluster(num_workers, num_ps):
  """Creates and starts local servers and returns the cluster_resolver."""
  worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
  ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]

  cluster_dict = {}
  cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
  if num_ps > 0:
    cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]

  cluster_spec = tf.train.ClusterSpec(cluster_dict)

  # Workers need some inter_ops threads to work properly.
  worker_config = tf.compat.v1.ConfigProto()
  if multiprocessing.cpu_count() < num_workers + 1:
    worker_config.inter_op_parallelism_threads = num_workers + 1

  for i in range(num_workers):
    tf.distribute.Server(
        cluster_spec, job_name="worker", task_index=i, config=worker_config,
        protocol="grpc")

  for i in range(num_ps):
    tf.distribute.Server(
        cluster_spec, job_name="ps", task_index=i, protocol="grpc")

  cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
      cluster_spec, rpc_layer="grpc")
  return cluster_resolver

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)

インプロセスクラスターセットアップは、単体テストで頻繁に使用されます。これが一例です。

ParameterServerStrategyインスタンス化します

トレーニングコードに飛び込む前に、 ParameterServerStrategyオブジェクトをインスタンス化しましょう。これは、カスタムトレーニングループまたはModel.fitのどちらを続行するかに関係なく必要であることに注意してください。 variable_partitioner引数については、次のセクションで説明します。

variable_partitioner = (
    tf.distribute.experimental.partitioners.FixedShardsPartitioner(
        num_shards=NUM_PS))

strategy = tf.distribute.experimental.ParameterServerStrategy(
    cluster_resolver,
    variable_partitioner=variable_partitioner)
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'ps': ['localhost:18923', 'localhost:24530'], 'worker': ['localhost:19174', 'localhost:22565', 'localhost:23430']})
INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'ps': ['localhost:18923', 'localhost:24530'], 'worker': ['localhost:19174', 'localhost:22565', 'localhost:23430']})
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:chief/replica:0/task:0/device:CPU:0'], variable_device = '/job:chief/replica:0/task:0/device:CPU:0'
INFO:tensorflow:Number of GPUs on workers: 0

トレーニングにGPUを使用するには、各ワーカーに表示されるGPUを割り当てます。 ParameterServerStrategyは、各ワーカーで使用可能なすべてのGPUを使用しますが、すべてのワーカーが同じ数のGPUを使用できるようにする必要があるという制限があります。

可変シャーディング

変数シャーディングとは、変数を複数の小さな変数に分割することです。これらの小さな変数をシャードと呼びます。可変シャーディングは、これらのシャードにアクセスするときにネットワーク負荷を分散するのに役立つ場合があります。正規変数の計算と保存を複数のパラメーターサーバーに分散することも役立ちます。

変数のシャーディングを有効にするには、 ParameterServerStrategyオブジェクトを作成するときにvariable_partitioner渡すことができます。 variable_partitioner変数が作成されるたびに呼び出されます、変数の各次元に沿った破片の数を返すことが期待されています。 tf.distribute.experimental.partitioners.FixedShardsPartitionerなど、 tf.distribute.experimental.partitioners.FixedShardsPartitioner variable_partitionerがいくつか用意されていtf.distribute.experimental.partitioners.FixedShardsPartitioner

variable_partitionerが渡され、 strategy.scope()直下で変数を作成すると、シャードのリストへのアクセスを提供するvariablesプロパティを持つコンテナータイプになりvariables 。ほとんどの場合、このコンテナはすべてのシャードを連結することで自動的にTensorに変換されます。その結果、正規変数として使用できます。一方、のようないくつかのTensorFlow方法tf.nn.embedding_lookupこのコンテナのタイプのための効率的な実装を提供し、これらの方法で自動連結が回避されます。

詳細については、 ParameterServerStrategyのAPIドキュメント文字列を参照してください。

Model.fitトレーニング

Kerasは、 Model.fitを介して使いやすいトレーニングAPIを提供します。これは、 train_stepでトレーニングループを処理し、オーバーライド可能なtrain_steptrain_stepチェックポイント保存やサマリー保存などの機能を提供するコールバックを備えています。 Model.fit使用すると、ストラテジーオブジェクトを簡単に交換するだけで、同じトレーニングコードを他のストラテジーに使用できます。

入力データ

Model.fitパラメータサーバの訓練では、入力されたデータは、タイプの単一の引数とる呼び出し可能で提供されている必要がありtf.distribute.InputContext 、と返しtf.data.Dataset 。次に、作成tf.keras.utils.experimental.DatasetCreatorようになりますオブジェクトcallableし、オプションのtf.distribute.InputOptionsを経由してオブジェクトをinput_options引数。パラメータサーバートレーニングを使用してデータをシャッフルして繰り返し、ライブラリがエポック境界を認識できるように、 fit呼び出しでsteps_per_epochを指定することをお勧めします。

InputContext引数の詳細については、分散入力ガイドを参照してください。

def dataset_fn(input_context):
  global_batch_size = 64
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)
  x = tf.random.uniform((10, 10))
  y = tf.random.uniform((10,))
  dataset = tf.data.Dataset.from_tensor_slices((x, y)).shuffle(10).repeat()
  dataset = dataset.shard(
      input_context.num_input_pipelines, input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2)
  return dataset

dc = tf.keras.utils.experimental.DatasetCreator(dataset_fn)

dataset_fnのコードは、各ワーカーマシンの入力デバイス(通常はCPU)で呼び出されます。

モデルの構築とコンパイル

ここで、選択したAPIを使用してtf.keras.Modelを作成し(ここでは、簡単なtf.keras.models.Sequentialモデルをデモンストレーションとして使用しています)、続いてModel.compile呼び出して、オプティマイザーなどのコンポーネントを組み込みます。メトリック、またはsteps_per_executionなどのパラメーター:

with strategy.scope():
  model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)])

model.compile(tf.keras.optimizers.SGD(), loss='mse', steps_per_execution=10)

コールバックとトレーニング

実際のトレーニングのためにmodel.fitを呼び出す前に、次のような一般的なタスクに必要なコールバックを準備しましょう。

  • ModelCheckpointモデルの重みを保存します。

  • BackupAndRestoreトレーニングの進行状況が自動的にバックアップされ、クラスターで使用不可(アボートやプリエンプションなど)が発生した場合に回復されるようにします。

  • TensorBoard進捗レポートを要約ファイルに保存し、TensorBoardツールで視覚化します。

パフォーマンスを考慮して、カスタムコールバックをParameterServerStrategyとともに使用する場合、バッチレベルのコールバックをオーバーライドできないことに注意してください。カスタムコールバックを変更してエポックレベルの呼び出しにし、 steps_per_epochを適切な値に調整してください。さらに、 steps_per_epochは、 ParameterServerStrategyとともに使用する場合のModel.fit必須引数です。

working_dir = '/tmp/my_working_dir'
log_dir = os.path.join(working_dir, 'log')
ckpt_filepath = os.path.join(working_dir, 'ckpt')
backup_dir = os.path.join(working_dir, 'backup')
callbacks = [
    tf.keras.callbacks.TensorBoard(log_dir=log_dir),
    tf.keras.callbacks.ModelCheckpoint(filepath=ckpt_filepath),
    tf.keras.callbacks.experimental.BackupAndRestore(backup_dir=backup_dir),
]
model.fit(dc, epochs=5, steps_per_epoch=20, callbacks=callbacks)
Epoch 1/5
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
20/20 - 3s - loss: 0.4391
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
Epoch 2/5
20/20 - 0s - loss: 0.4103
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
Epoch 3/5
20/20 - 0s - loss: 0.3056
WARNING:tensorflow:5 out of the last 5 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7fefd40207b8> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
WARNING:tensorflow:6 out of the last 6 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7fee7553b730> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
Epoch 4/5
20/20 - 0s - loss: 0.2992
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
Epoch 5/5
20/20 - 0s - loss: 0.2698
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
<tensorflow.python.keras.callbacks.History at 0x7fefe405c048>

ClusterCoordinator直接使用(オプション)

Model.fitトレーニングパスを選択した場合でも、オプションでClusterCoordinatorオブジェクトをインスタンス化して、ワーカーで実行する他の関数をスケジュールできます。詳細と例については、以下の「カスタムトレーニングループを使用したトレーニング」セクションを参照してください。

カスタムトレーニングループを使用したトレーニング

tf.distribute.Strategyしたカスタムトレーニングループは、トレーニングループを定義するための優れた柔軟性を提供します。上で定義したParameterServerStrategyを使用して、 ClusterCoordinatorを使用して、トレーニングステップの実行をリモートワーカーにディスパッチします。

次に、他のtf.distribute.Strategyしたトレーニングループで見たように、モデルを作成し、データセットとステップ関数を定義します。詳細については、このチュートリアルをご覧ください

効率的なデータセットのプリフェッチを確実にするには、以下のリモートワーカーへディスパッチトレーニング手順に記載されている推奨される分散データセット作成APIを使用してください。また、コールに確認してくださいstrategy.run労働者に割り当てられたのGPUをフルに活用するためにworker_fn内部。残りの手順は、GPUを使用する場合と使用しない場合のトレーニングで同じです。

次の手順でこれらのコンポーネントを作成しましょう。

データを設定する

まず、Keras前処理レイヤーによって実装された前処理ロジックを含むデータセットを作成する関数を記述します。私たちは外で、これらの層を作成しますdataset_fnが、内部で変換を適用dataset_fnあなたがラップされますので、 dataset_fntf.function変数がその中に作成することはできません。

feature_vocab = [
    "avenger", "ironman", "batman", "hulk", "spiderman", "kingkong",
    "wonder_woman"
]
label_vocab = ["yes", "no"]

with strategy.scope():
  feature_lookup_layer = kpl.StringLookup(vocabulary=feature_vocab)

  label_lookup_layer = kpl.StringLookup(vocabulary=label_vocab,
                                        num_oov_indices=0,
                                        mask_token=None)

  raw_feature_input = keras.layers.Input(
      shape=(3,), dtype=tf.string, name="feature")
  feature_id_input = feature_lookup_layer(raw_feature_input)
  feature_preprocess_stage = keras.Model(
      {"features": raw_feature_input}, feature_id_input)

  raw_label_input = keras.layers.Input(
      shape=(1,), dtype=tf.string, name="label")
  label_id_input = label_lookup_layer(raw_label_input)
  label_preprocess_stage = keras.Model({"label": raw_label_input}, label_id_input)

データセットにおもちゃの例を生成します。

def feature_and_label_gen(num_examples=200):
  examples = {"features": [], "label": []}
  for _ in range(num_examples):
    features = random.sample(feature_vocab, 3)
    label = ["yes"] if "avenger" in features else ["no"]
    examples["features"].append(features)
    examples["label"].append(label)
  return examples

examples = feature_and_label_gen()

次に、dataset_fnでラップされたトレーニングデータセットを作成します。

def dataset_fn(_):
  raw_dataset = tf.data.Dataset.from_tensor_slices(examples)

  train_dataset = raw_dataset.map(
      lambda x: (
          {"features": feature_preprocess_stage(x["features"])},
          label_preprocess_stage(x["label"])
      )).shuffle(200).batch(32).repeat()
  return train_dataset

モデルを構築する

次に、モデルとその他のオブジェクトを作成します。必ずstrategy.scope下にすべての変数を作成してください。

# These variables created under the `strategy.scope` will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
  # Create the model. The input needs to be compatible with KPLs.
  model_input = keras.layers.Input(
      shape=(3,), dtype=tf.int64, name="model_input")

  emb_layer = keras.layers.Embedding(
      input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=20)
  emb_output = tf.reduce_mean(emb_layer(model_input), axis=1)
  dense_output = keras.layers.Dense(units=1, activation="sigmoid")(emb_output)
  model = keras.Model({"features": model_input}, dense_output)

  optimizer = keras.optimizers.RMSprop(learning_rate=0.1)
  accuracy = keras.metrics.Accuracy()

FixedShardsPartitioner使用すると、すべての変数が2つのシャードに分割され、各シャードが異なるパラメーターサーバーに割り当てられていることを確認しましょう。

assert len(emb_layer.weights) == 2
assert emb_layer.weights[0].shape == (5, 20)
assert emb_layer.weights[1].shape == (4, 20)
assert emb_layer.weights[0].device == "/job:ps/replica:0/task:0/device:CPU:0"
assert emb_layer.weights[1].device == "/job:ps/replica:0/task:1/device:CPU:0"

トレーニングステップを定義する

第三に、 tf.functionラップされたトレーニングステップを作成します。

@tf.function
def step_fn(iterator):

  def replica_fn(batch_data, labels):
    with tf.GradientTape() as tape:
      pred = model(batch_data, training=True)
      per_example_loss = keras.losses.BinaryCrossentropy(
              reduction=tf.keras.losses.Reduction.NONE)(labels, pred)
      loss = tf.nn.compute_average_loss(per_example_loss)
      gradients = tape.gradient(loss, model.trainable_variables)

    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    accuracy.update_state(labels, actual_pred)
    return loss

  batch_data, labels = next(iterator)
  losses = strategy.run(replica_fn, args=(batch_data, labels))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)

上記のステップ関数では、 step_fnstrategy.runstrategy.reduceを呼び出すと、ワーカーごとに複数のGPUをサポートできます。ワーカーにGPUが割り当てられている場合、 strategy.runはデータセットを複数のレプリカに分散します。

リモートワーカーへのトレーニング手順のディスパッチ

すべての計算がParameterServerStrategyによって定義された後、 ClusterCoordinatorクラスを使用してリソースを作成し、トレーニング手順をリモートワーカーに配布します。

まず、 ClusterCoordinatorオブジェクトを作成し、戦略オブジェクトを渡します。

coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)

次に、ワーカーごとのデータセットとイテレーターを作成します。以下のper_worker_dataset_fnでは、GPUへの効率的なプリフェッチをシームレスにper_worker_dataset_fnように、 dataset_fnstrategy.distribute_datasets_from_functionラップすることをお勧めします。

@tf.function
def per_worker_dataset_fn():
  return strategy.distribute_datasets_from_function(dataset_fn)

per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).

最後のステップは、 scheduleを使用してリモートワーカーに計算を分散することです。 scheduleメソッドはtf.functionをキューにtf.function 、futureのようなRemoteValueすぐに返します。キューに入れられた関数はバックグラウンドスレッドでリモートワーカーにディスパッチされ、 RemoteValueは非同期で埋められます。 joinメソッドを使用して、スケジュールされたすべての関数が実行されるまで待機できます。

num_epoches = 4
steps_per_epoch = 5
for i in range(num_epoches):
  accuracy.reset_states()
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  # Wait at epoch boundaries.
  coordinator.join()
  print ("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
Finished epoch 0, accuracy is 0.818750.
Finished epoch 1, accuracy is 1.000000.
Finished epoch 2, accuracy is 1.000000.
Finished epoch 3, accuracy is 1.000000.

RemoteValue結果を取得する方法は次のRemoteValueです。

loss = coordinator.schedule(step_fn, args=(per_worker_iterator,))
print ("Final loss is %f" % loss.fetch())
Final loss is 0.004459

または、すべてのステップを起動して、完了を待っている間に何かを実行することもできます。

for _ in range(total_steps):
  coordinator.schedule(step_fn, args=(per_worker_iterator,))
while not coordinator.done():
  time.sleep(10)
  # Do something like logging metrics or writing checkpoints.

この特定の例の完全なトレーニングとサービスのワークフローについては、このテストを確認してください。

データセット作成の詳細

上記のコードのデータセットは、 create_per_worker_datasetを使用して作成されていcreate_per_worker_dataset 。ワーカーごとに1つのデータセットを作成し、コンテナーオブジェクトを返します。 iterメソッドを呼び出して、ワーカーごとのイテレーターを作成できます。ワーカーごとのイテレーターには、ワーカーごとに1つのイテレーターが含まれ、特定のワーカーで関数が実行される前に、 scheduleメソッドに渡される関数の入力引数にワーカーの対応するスライスが代入されます。

現在、 scheduleメソッドは、ワーカーが同等であると想定しているため、 dataset.shuffle操作が含まれている場合に異なる方法でシャッフルされる可能性があることを除いて、異なるワーカーのデータセットは同じであると想定しています。このため、データセットのOutOfRangeErrorに依存するのではなく、データセットを無期限に繰り返し、有限数のステップをスケジュールすることもお勧めします。

もう1つの重要な注意点は、 tf.dataデータセットは、タスクの境界を越えた暗黙的なシリアル化と逆シリアル化をサポートしていないことです。したがって、 create_per_worker_dataset渡される関数内にデータセット全体を作成することが重要です。

評価

分散トレーニングで評価ループを定義して実行する方法は複数あります。以下に説明するように、それぞれに独自の長所と短所があります。好みがない場合は、インライン評価方式をお勧めします。

インライン評価

この方法では、コーディネーターはトレーニングと評価を交互に行うため、インライン評価と呼びます。インライン評価にはいくつかの利点があります。たとえば、単一のタスクでは保持できない大規模な評価モデルや評価データセットをサポートできます。別の例として、評価結果を使用して、次のエポックをトレーニングするための決定を下すことができます。

インライン評価を実装するには、次の2つの方法があります。

  • 直接評価-小さなモデルと評価データセットの場合、コーディネーターは、コーディネーターの評価データセットを使用して、分散モデルで直接評価を実行できます。
eval_dataset = tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).batch(8)

eval_accuracy = keras.metrics.Accuracy()
for batch_data, labels in eval_dataset:
  pred = model(batch_data, training=False)
  actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
  eval_accuracy.update_state(labels, actual_pred)

print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000
  • 分散評価-コーディネーターで直接実行することが不可能な大規模なモデルまたはデータセットの場合、コーディネータータスクは、 schedule / joinメソッドを介して評価タスクをワーカーに分散できます。
with strategy.scope():
  # Define the eval metric on parameter servers.
  eval_accuracy = keras.metrics.Accuracy()

@tf.function
def eval_step(iterator):
  def replica_fn(batch_data, labels):
    pred = model(batch_data, training=False)
    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    eval_accuracy.update_state(labels, actual_pred)
  batch_data, labels = next(iterator)
  strategy.run(replica_fn, args=(batch_data, labels))

def eval_dataset_fn():
  return tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).shuffle(16).repeat().batch(8)

per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn)
per_worker_eval_iterator = iter(per_worker_eval_dataset)

eval_steps_per_epoch = 2
for _ in range(eval_steps_per_epoch):
  coordinator.schedule(eval_step, args=(per_worker_eval_iterator,))
coordinator.join()
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000

サイドカー評価

もう1つの方法は、サイドカー評価と呼ばれ、チェックポイントを繰り返し読み取り、最新のチェックポイントで評価を実行する専用の評価タスクを作成することです。評価結果に基づいてトレーニングループを変更する必要がない場合は、トレーニングプログラムを早期に終了できます。ただし、評価をトリガーするには、追加の評価タスクと定期的なチェックポイントが必要です。以下は、可能なサイドカー評価ループです。

checkpoint_dir = ...
eval_model = ...
eval_data = ...
checkpoint = tf.train.Checkpoint(model=eval_model)

for latest_checkpoint in tf.train.checkpoints_iterator(
    checkpoint_dir):
  try:
    checkpoint.restore(latest_checkpoint).expect_partial()
  except (tf.errors.OpError,) as e:
    # checkpoint may be deleted by training when it is about to read it.
    continue

  # Optionally add callbacks to write summaries.
  eval_model.evaluate(eval_data)

  # Evaluation finishes when it has evaluated the last epoch.
  if latest_checkpoint.endswith('-{}'.format(train_epoches)):
    break

実世界のクラスター

実際の実稼働環境では、すべてのタスクをさまざまなマシンのさまざまなプロセスで実行します。各タスクでクラスター情報を構成する最も簡単な方法は、「TF_CONFIG」環境変数を設定し、 tf.distribute.cluster_resolver.TFConfigClusterResolverを使用して「TF_CONFIG」を解析することです。 「TF_CONFIG」環境変数の一般的な説明については、 分散トレーニングガイドを参照してください。

Kubernetesまたはその他の構成テンプレートを使用してトレーニングタスクを開始する場合、これらのテンプレートですでに「TF_CONFIG」が設定されている可能性があります。

「TF_CONFIG」環境変数を設定します

3つのワーカーと2つのパラメーターサーバーがあるとすると、ワーカー1の「TF_CONFIG」は次のようになります。

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"],
        "chief": ["host6:port"]
    },
    "task": {"type": "worker", "index": 1}
})

評価者の「TF_CONFIG」は次のようになります。

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "evaluator": ["host7:port"]
    },
    "task": {"type": "evaluator", "index": 0}
})

上記のエバリュエーターの「TF_CONFIG」文字列の「cluster」部分はオプションです。

すべてのタスクに同じバイナリを使用する場合

単一のバイナリを使用してこれらすべてのタスクを実行する場合は、最初にプログラムをさまざまな役割に分岐させる必要があります。

cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if cluster_resolver.task_type in ("worker", "ps"):
  # start a TensorFlow server and wait.
elif cluster_resolver.task_type == "evaluator":
  # run side-car evaluation
else:
  # run the coordinator.

次のコードはTensorFlowサーバーを起動し、待機します。

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

server = tf.distribute.Server(
    cluster_resolver.cluster_spec(),
    job_name=cluster_resolver.task_type,
    task_index=cluster_resolver.task_id,
    protocol=cluster_resolver.rpc_layer or "grpc",
    start=True)
server.join()

タスクの失敗の処理

労働者の失敗

ClusterCoordinatorまたはModel.fitは、ワーカーの障害に対する組み込みのフォールトトレランスを提供します。ワーカーが回復すると、以前に提供されたデータセット関数(CTLの場合はcreate_per_worker_datasetModel.fit場合はDatasetCreator )がワーカーで呼び出され、データセットが再作成されます。

パラメータサーバーまたはコーディネータの障害

ただし、コーディネーターがパラメーターサーバーエラーを検出すると、すぐにUnavailableErrorまたはAbortedErrorします。この場合、コーディネーターを再起動できます。コーディネーター自体も利用できなくなる可能性があります。したがって、トレーニングの進行状況を失わないようにするために、特定のツールをお勧めします。

  • Model.fit場合、進行状況の保存と復元を自動的に処理するBackupAndRestoreコールバックを使用する必要があります。例については、上記のコールバックとトレーニングのセクションを参照してください。

  • CTLの場合、トレーニングを開始する前に、モデル変数を定期的にチェックポイントし、チェックポイントがある場合はチェックポイントからモデル変数をロードする必要があります。オプティマイザーがチェックポイントoptimizer.iterations場合、トレーニングの進行状況は、optimizer.iterationsからおおよそ推測できます。

checkpoint_manager = tf.train.CheckpointManager(
    tf.train.Checkpoint(model=model, optimizer=optimizer),
    checkpoint_dir,
    max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
  checkpoint = checkpoint_manager.checkpoint
  checkpoint.restore(
      checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()

global_steps = int(optimizer.iterations.numpy())
starting_epoch = global_steps // steps_per_epoch

for _ in range(starting_epoch, num_epoches):
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  coordinator.join()
  checkpoint_manager.save()

RemoteValue取得

関数が正常に実行された場合、 RemoteValueフェッチは成功することが保証されています。これは、現在、関数の実行後すぐに戻り値がコーディネーターにコピーされるためです。コピー中にワーカー障害が発生した場合、関数は別の使用可能なワーカーで再試行されます。したがって、パフォーマンスを最適化する場合は、戻り値なしで関数をスケジュールできます。

エラー報告

コーディネーターは、パラメーターサーバーからのUnavailableErrorなどのエラー、またはtf.debugging.check_numericsからのInvalidArgumentなどの他のアプリケーションエラーをtf.debugging.check_numericsすると、エラーを発生させる前に、保留中およびキューに入れられたすべての関数をキャンセルします。対応するRemoteValueフェッチすると、 CancelledErrorます。

エラーが発生した後、コーディネーターは同じエラーまたはキャンセルされた関数からのエラーを発生させません。

パフォーマンスの向上

ParameterServerStrategyClusterResolverを使用してトレーニングするときにパフォーマンスの問題が発生する場合は、いくつかの理由が考えられClusterResolver

一般的な理由の1つは、パラメーターサーバーの負荷が不均衡であり、一部の高負荷のパラメーターサーバーが容量に達していることです。複数の根本原因も考えられます。この問題を軽減するためのいくつかの簡単な方法は次のとおりです。

  1. 指定経由で大規模なモデル変数をシャードvariable_partitioner構築する際にParameterServerStrategy
  2. 可能であれば、すべてのパラメーターサーバーで必要なホットスポット変数を1つのステップで作成することは避けてください。たとえば、オプティマイザーで一定の学習率またはサブクラスtf.keras.optimizers.schedules.LearningRateScheduleします。これは、デフォルトの動作では、学習率が特定のパラメーターサーバーに配置され、各ステップで他のすべてのパラメーターサーバーによって要求される変数になるためです。 。
  3. 大きな語彙をシャッフルしてから、Keras前処理レイヤーに渡します。

パフォーマンスの問題のもう1つの考えられる理由は、コーディネーターです。 schedule / join最初の実装はPythonベースであるため、スレッドのオーバーヘッドが発生する可能性があります。また、コーディネーターとワーカーの間の待ち時間が長くなる可能性があります。このような場合は、

  • Model.fit 、あなたが設定することができsteps_per_executionに設けた引数Model.compile 1よりも大きな値にします。

  • CTLの場合、複数のステップを1つのtf.functionパックできます。

steps_per_invocation = 10
@tf.function
def step_fn(iterator):
  for _ in range(steps_per_invocation):
    features, labels = next(iterator)
    def replica_fn(features, labels):
      ...

    strategy.run(replica_fn, args=(features, labels))

ライブラリの最適化を継続しているため、将来、ほとんどのユーザーが手動でステップをパックする必要がないことを願っています。

さらに、パフォーマンスを向上させるための小さなトリックは、上記のタスク障害の処理のセクションで説明したように、戻り値なしで関数をスケジュールすることです。

既知の制限

既知の制限のほとんどは、上記のセクションで説明されています。このセクションでは、要約を提供します。

ParameterServerStrategy一般

  • フォールトトレランスを適切に機能させるには、コーディネーターを含むすべてのタスクでos.environment["grpc_fail_fast"]="use_caller"が必要です。
  • 同期パラメータサーバートレーニングはサポートされていません。
  • 通常、最適なパフォーマンスを実現するには、複数のステップを1つの関数にまとめる必要があります。
  • シャーディングされた変数を含むtf.saved_model.loadを介してtf.saved_model.loadをロードすることはサポートされていません。 TensorFlowServingを使用してこのようなsaved_modelをロードすることは機能することが期待されていることに注意してください。
  • シャードオプティマイザースロット変数を含むチェックポイントを異なる数のシャードにロードすることはサポートされていません。
  • コーディネータータスクを再起動せずにパラメーターサーバーの障害から回復することはサポートされていません。

Model.fit詳細

  • steps_per_epoch引数が必要であるModel.fit 。エポックで適切な間隔を提供する値を選択できます。
  • ParameterServerStrategyは、パフォーマンス上の理由からバッチレベルの呼び出しがあるカスタムコールバックをサポートしていません。これらの呼び出しは、適切に選択されたsteps_per_epochを使用してエポックレベルの呼び出しに変換する必要があります。これにより、 steps_per_epochのステップ数ごとにsteps_per_epochます。組み込みのコールバックは影響を受けません。バッチレベルの呼び出しは、パフォーマンスが向上するように変更されています。 ParameterServerStrategyバッチレベルの呼び出しのサポートが計画されています。
  • 同じ理由で、他の戦略とは異なり、プログレスバーとメトリックはエポック境界でのみログに記録されます。
  • Model.fit入力は、 DatasetCreatorタイプのみを取ります。
  • run_eagerlyはサポートされていません。
  • Model.fitでの評価はまだサポートされていません。これは優先事項の1つです。
  • Model.evaluateModel.predictはまだサポートされていません。

カスタムトレーニングループの詳細