![]() | ![]() | ![]() | ![]() |
概要概要
パラメータサーバートレーニングは、複数のマシンでモデルトレーニングをスケールアップするための一般的なデータ並列方法です。パラメーターサーバートレーニングクラスターは、ワーカーとパラメーターサーバーで構成されます。変数はパラメーターサーバー上に作成され、各ステップでワーカーによって読み取られて更新されます。デフォルトでは、ワーカーはこれらの変数を相互に同期せずに個別に読み取り、更新します。これが、パラメータサーバースタイルのトレーニングが非同期トレーニングと呼ばれることがある理由です。
TensorFlow 2パラメーターサーバーのトレーニングでは、 tf.distribute.experimental.coordinator.ClusterCoordinator
クラスを介して中央コーディネーターを使用します。
この実装では、 worker
タスクとparameter server
タスクは、コーディネーターからの要求をリッスンするtf.distribute.Server
を実行します。コーディネーターは、リソースを作成し、トレーニングタスクをディスパッチし、チェックポイントを書き込み、タスクの失敗に対処します。
このアーキテクチャと新しいClusterCoordinator
クラスは、より柔軟でシンプルなプログラミングモデルを提供すると信じています。
ClusterCoordinator
ClusterCoordinator
クラスは、 tf.distribute.Strategy
オブジェクトと連携してtf.distribute.Strategy
するtf.distribute.Strategy
ます。このtf.distribute.Strategy
オブジェクトは、クラスターの情報を渡すために必要であり、 MirroredStrategy
したカスタムトレーニングで見たように、トレーニングステップを定義するために使用されます。次に、 ClusterCoordinator
オブジェクトは、これらのトレーニングステップの実行をリモートワーカーにディスパッチします。現在、 ClusterCoordinator
はtf.distribute.experimental.ParameterServerStrategy
のみ機能しtf.distribute.experimental.ParameterServerStrategy
。
ClusterCoordinator
オブジェクトによって提供される最も重要なAPIはschedule
です。 schedule
APIはtf.function
をキューにtf.function
、futureのようなRemoteValue
すぐに返します。キューに入れられた関数はバックグラウンドスレッドのリモートワーカーにディスパッチされ、それらのRemoteValue
は非同期で埋められます。 schedule
はワーカーの割り当てを必要としないため、渡されたtf.function
は使用可能な任意のワーカーで実行できます。実行されたワーカーが完了する前に使用できなくなった場合、関数は別の使用可能なワーカーで再試行されます。この事実と関数の実行がアトミックではないという事実のために、関数は複数回実行される可能性があります。
ClusterCoordinator
は、リモート関数のディスパッチに加えて、すべてのワーカーでデータセットを作成し、ワーカーが障害から回復したときにこれらのデータセットを再構築するのにも役立ちます。
チュートリアルのセットアップ
pip install -q portpicker
import multiprocessing
import os
import random
import portpicker
import tensorflow as tf
import tensorflow.keras as keras
import tensorflow.keras.layers.experimental.preprocessing as kpl
クラスターのセットアップ
上記のように、パラメーターサーバートレーニングクラスターには、トレーニングプログラムを実行するコーディネータータスク、TensorFlowサーバーを実行する1つまたは複数のワーカーとパラメーターサーバータスク、つまりtf.distribute.Server
、および場合によってはtf.distribute.Server
を実行する追加の評価タスクが必要です。評価(下記のサイドカー評価セクションを参照)。それらを設定するための要件は次のとおりです。
- コーディネータータスクは、エバリュエーターを除く他のすべてのTensorFlowサーバーのアドレスとポートを知る必要があります。
- ワーカーとパラメーターサーバーは、リッスンする必要のあるポートを知る必要があります。簡単にするために、これらのタスクでTensorFlowサーバーを作成するときは、通常、完全なクラスター情報を渡します。
- 評価者タスクは、トレーニングクラスターの設定を知る必要はありません。含まれている場合は、トレーニングクラスターへの接続を試行しないでください。
- ワーカーとパラメーターサーバーには、それぞれ「worker」と「ps」のタスクタイプが必要です。コーディネーターは、従来の理由から、タスクタイプとして「チーフ」を使用する必要があります。
このチュートリアルでは、パラメーターサーバーのトレーニング全体をcolabで実行できるように、インプロセスクラスターを作成します。実際のクラスターを設定する方法については、後のセクションで紹介します。
インプロセスクラスター
このチュートリアルでは、事前に多数のTensorFlowサーバーを起動し、後でそれらに接続します。
def create_in_process_cluster(num_workers, num_ps):
"""Creates and starts local servers and returns the cluster_resolver."""
worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]
cluster_dict = {}
cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
if num_ps > 0:
cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]
cluster_spec = tf.train.ClusterSpec(cluster_dict)
# Workers need some inter_ops threads to work properly.
worker_config = tf.compat.v1.ConfigProto()
if multiprocessing.cpu_count() < num_workers + 1:
worker_config.inter_op_parallelism_threads = num_workers + 1
for i in range(num_workers):
tf.distribute.Server(
cluster_spec, job_name="worker", task_index=i, config=worker_config,
protocol="grpc")
for i in range(num_ps):
tf.distribute.Server(
cluster_spec, job_name="ps", task_index=i, protocol="grpc")
cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
cluster_spec, rpc_layer="grpc")
return cluster_resolver
# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"
NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)
カスタムトレーニングループを使用したトレーニング
tf.distribute.Strategy
したカスタムトレーニングループは、トレーニングループを定義するための優れた柔軟性を提供します。現在、TensorFlow 2でのパラメーターサーバートレーニングでは、カスタムトレーニングループのみがサポートされています。ここでは、 ParameterServerStrategy
を使用してトレーニングステップを定義してから、 ClusterCoordinator
を使用してトレーニングステップの実行をリモートワーカーにディスパッチします。
ParameterServerStrategy
作成します
カスタムトレーニングループでトレーニングステップを作成するための最初のステップは、 ParameterServerStrategy
を作成することです。 variable_partitioner
については後で説明しvariable_partitioner
。
variable_partitioner = (
tf.distribute.experimental.partitioners.FixedShardsPartitioner(
num_shards=NUM_PS))
strategy = tf.distribute.experimental.ParameterServerStrategy(
cluster_resolver,
variable_partitioner=variable_partitioner)
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:localhost/replica:0/task:0/device:GPU:0'], variable_device = '/job:localhost/replica:0/task:0/device:GPU:0' INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'ps': ['localhost:20829', 'localhost:21744'], 'worker': ['localhost:15899', 'localhost:17275', 'localhost:17846']}) INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'ps': ['localhost:20829', 'localhost:21744'], 'worker': ['localhost:15899', 'localhost:17275', 'localhost:17846']})
次に、他のtf.distribute.Strategy
したトレーニングループで見たように、モデルを作成し、データセットとステップ関数を定義します。詳細については、このチュートリアルをご覧ください。次の手順でこれらのコンポーネントを作成しましょう。
データを設定する
まず、Keras前処理レイヤーによって実装された前処理ロジックを含むデータセットを作成する関数を記述します。私たちは外で、これらの層を作成しますdataset_fn
が、内部で変換を適用dataset_fn
あなたがラップされますので、 dataset_fn
にtf.function
変数がその中に作成することはできません。
feature_vocab = [
"avenger", "ironman", "batman", "hulk", "spiderman", "kingkong",
"wonder_woman"
]
label_vocab = ["yes", "no"]
with strategy.scope():
feature_lookup_layer = kpl.StringLookup(vocabulary=feature_vocab)
label_lookup_layer = kpl.StringLookup(vocabulary=label_vocab,
num_oov_indices=0,
mask_token=None)
raw_feature_input = keras.layers.Input(
shape=(3,), dtype=tf.string, name="feature")
feature_id_input = feature_lookup_layer(raw_feature_input)
feature_preprocess_stage = keras.Model(
{"features": raw_feature_input}, feature_id_input)
raw_label_input = keras.layers.Input(
shape=(1,), dtype=tf.string, name="label")
label_id_input = label_lookup_layer(raw_label_input)
label_preprocess_stage = keras.Model({"label": raw_label_input}, label_id_input)
データセットにおもちゃの例を生成します。
def feature_and_label_gen(num_examples=200):
examples = {"features": [], "label": []}
for _ in range(num_examples):
features = random.sample(feature_vocab, 3)
label = ["yes"] if "avenger" in features else ["no"]
examples["features"].append(features)
examples["label"].append(label)
return examples
examples = feature_and_label_gen()
次に、dataset_fnでラップされたトレーニングデータセットを作成します。
def dataset_fn(_):
raw_dataset = tf.data.Dataset.from_tensor_slices(examples)
train_dataset = raw_dataset.map(
lambda x: (
{"features": feature_preprocess_stage(x["features"])},
label_preprocess_stage(x["label"])
)).shuffle(200).batch(32).repeat()
return train_dataset
モデルを構築する
次に、モデルとその他のオブジェクトを作成します。必ずstrategy.scope
下にすべての変数を作成してください。
# These variables created under the `strategy.scope` will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
# Create the model. The input needs to be compatible with KPLs.
model_input = keras.layers.Input(
shape=(3,), dtype=tf.int64, name="model_input")
emb_layer = keras.layers.Embedding(
input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=20)
emb_output = tf.reduce_mean(emb_layer(model_input), axis=1)
dense_output = keras.layers.Dense(units=1, activation="sigmoid")(emb_output)
model = keras.Model({"features": model_input}, dense_output)
optimizer = keras.optimizers.RMSprop(learning_rate=0.1)
accuracy = keras.metrics.Accuracy()
トレーニングステップを定義する
第三に、 tf.function
ラップされたトレーニングステップを作成します。
@tf.function
def step_fn(iterator):
def replica_fn(batch_data, labels):
with tf.GradientTape() as tape:
pred = model(batch_data, training=True)
per_example_loss = keras.losses.BinaryCrossentropy(
reduction=tf.keras.losses.Reduction.NONE)(labels, pred)
loss = tf.nn.compute_average_loss(per_example_loss)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
accuracy.update_state(labels, actual_pred)
return loss
batch_data, labels = next(iterator)
losses = strategy.run(replica_fn, args=(batch_data, labels))
return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)
上記のstep関数では、 step_fn
でstrategy.run
とstrategy.reduce
を呼び出すと、現時点では簡単な実装ですが、将来的にGPUまたは複数のレプリカワーカーをサポートするのに役立ちます。
リモートワーカーへのトレーニング手順のディスパッチ
すべての計算がParameterServerStrategy
によって定義された後、 ClusterCoordinator
クラスを使用してリソースを作成し、トレーニング手順をリモートワーカーに配布します。
まず、 ClusterCoordinator
オブジェクトを作成し、戦略オブジェクトを渡します。
coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)
次に、ワーカーごとのデータセットとイテレーターを作成します。以下のper_worker_dataset_fn
では、 dataset_fn
をstrategy.distribute_datasets_from_function
ラップすることはオプションですが、GPUがParameterServerStrategy
によってサポートされる場合、将来的にGPUへの効率的なプリフェッチをシームレスにサポートできるようになります。
@tf.function
def per_worker_dataset_fn():
return strategy.distribute_datasets_from_function(dataset_fn)
per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
最後のステップは、 schedule
を使用して計算をリモートワーカーに分散することです。 schedule
メソッドはtf.function
をキューにtf.function
、futureのようなRemoteValue
すぐに返します。キューに入れられた関数はバックグラウンドスレッドでリモートワーカーにディスパッチされ、 RemoteValue
は非同期で埋められます。 join
メソッドを使用して、スケジュールされたすべての関数が実行されるまで待機できます。
num_epoches = 4
steps_per_epoch = 5
for i in range(num_epoches):
accuracy.reset_states()
for _ in range(steps_per_epoch):
coordinator.schedule(step_fn, args=(per_worker_iterator,))
# Wait at epoch boundaries.
coordinator.join()
print ("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:1/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:1/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:1/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:1/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',). Finished epoch 0, accuracy is 0.462500. Finished epoch 1, accuracy is 0.925000. Finished epoch 2, accuracy is 1.000000. Finished epoch 3, accuracy is 1.000000.
RemoteValue
結果を取得する方法は次のRemoteValue
です。
loss = coordinator.schedule(step_fn, args=(per_worker_iterator,))
print ("Final loss is %f" % loss.fetch())
Final loss is 0.015665
または、すべてのステップを起動して、完了を待っている間に何かを実行することもできます。
for _ in range(total_steps):
coordinator.schedule(step_fn, args=(per_worker_iterator,))
while not coordinator.done():
time.sleep(10)
# Do something like logging metrics or writing checkpoints.
この特定の例の完全なトレーニングとサービスのワークフローについては、このテストを確認してください。
データセット作成の詳細
上記のコードのデータセットは、 create_per_worker_dataset
を使用して作成されていcreate_per_worker_dataset
。ワーカーごとに1つのデータセットを作成し、コンテナーオブジェクトを返します。その上でiter
メソッドを呼び出して、ワーカーごとのイテレーターを作成できます。ワーカーごとのイテレーターには、ワーカーごとに1つのイテレーターが含まれ、特定のワーカーで関数が実行される前に、 schedule
メソッドに渡される関数の入力引数にワーカーの対応するスライスが代入されます。
現在、 schedule
メソッドは、ワーカーが同等であると想定しているため、 dataset.shuffle操作が含まれている場合に異なる方法でシャッフルされる可能性があることを除いて、異なるワーカーのデータセットは同じであると想定しています。このため、データセットのOutOfRangeError
に依存するのではなく、データセットを無期限に繰り返し、有限数のステップをスケジュールすることもお勧めします。
もう1つの重要な注意点は、 tf.data
データセットは、タスクの境界を越えた暗黙的なシリアル化と逆シリアル化をサポートしていないことです。したがって、 create_per_worker_dataset
渡される関数内にデータセット全体を作成することが重要です。
可変シャーディング
変数シャーディングとは、変数を複数の小さな変数に分割することです。これらの小さな変数をシャードと呼びます。可変シャーディングは、これらのシャードにアクセスするときにネットワーク負荷を分散するのに役立つ場合があります。正規変数の計算と保存を複数のパラメーターサーバーに分散することも役立ちます。
変数のシャーディングを有効にするには、 ParameterServerStrategy
オブジェクトを作成するときにvariable_partitioner
渡すことができます。 variable_partitioner
変数が作成されるたびに呼び出されます、変数の各次元に沿った破片の数を返すことが期待されています。 tf.distribute.experimental.partitioners.FixedShardsPartitioner
など、 tf.distribute.experimental.partitioners.FixedShardsPartitioner
variable_partitioner
がいくつか用意されていtf.distribute.experimental.partitioners.FixedShardsPartitioner
。
上記の例では、 FixedShardsPartitioner
を使用して、すべての変数を2つのシャードに分割し、各シャードを異なるパラメーターサーバーに割り当てます。
assert len(emb_layer.weights) == 2
assert emb_layer.weights[0].shape == (5, 20)
assert emb_layer.weights[1].shape == (4, 20)
assert emb_layer.weights[0].device == "/job:ps/replica:0/task:0/device:CPU:0"
assert emb_layer.weights[1].device == "/job:ps/replica:0/task:1/device:CPU:0"
variable_partitioner
が渡され、 strategy.scope()
直下で変数を作成すると、シャードのリストへのアクセスを提供するvariables
プロパティを持つコンテナータイプになりvariables
。ほとんどの場合、このコンテナーは、すべてのシャードを連結することによって自動的にTensorに変換されます。その結果、正規変数として使用できます。一方、のようないくつかのTensorFlow方法tf.nn.embedding_lookup
このコンテナのタイプのための効率的な実装を提供し、これらの方法で自動連結が回避されます。
詳細については、 ParameterServerStrategy
のAPIドキュメント文字列を参照してください。
評価
分散トレーニングで評価ループを定義して実行する方法は複数あります。以下に説明するように、それぞれに独自の長所と短所があります。好みがない場合は、インライン評価方法をお勧めします。
インライン評価
この方法では、コーディネーターはトレーニングと評価を交互に行うため、インライン評価と呼びます。インライン評価にはいくつかの利点があります。たとえば、単一のタスクでは保持できない大規模な評価モデルや評価データセットをサポートできます。別の例として、評価結果を使用して、次のエポックをトレーニングするための決定を下すことができます。
インライン評価を実装するには、次の2つの方法があります。
- 直接評価-小さなモデルと評価データセットの場合、コーディネーターは、コーディネーターの評価データセットを使用して、分散モデルで直接評価を実行できます。
eval_dataset = tf.data.Dataset.from_tensor_slices(
feature_and_label_gen(num_examples=16)).map(
lambda x: (
{"features": feature_preprocess_stage(x["features"])},
label_preprocess_stage(x["label"])
)).batch(8)
eval_accuracy = keras.metrics.Accuracy()
for batch_data, labels in eval_dataset:
pred = model(batch_data, training=False)
actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
eval_accuracy.update_state(labels, actual_pred)
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,). Evaluation accuracy: 1.000000
- 分散評価-コーディネーターで直接実行することが不可能な大規模なモデルまたはデータセットの場合、コーディネータータスクは、
schedule
/join
メソッドを介して評価タスクをワーカーに分散できます。
with strategy.scope():
# Define the eval metric on parameter servers.
eval_accuracy = keras.metrics.Accuracy()
@tf.function
def eval_step(iterator):
def replica_fn(batch_data, labels):
pred = model(batch_data, training=False)
actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
eval_accuracy.update_state(labels, actual_pred)
batch_data, labels = next(iterator)
strategy.run(replica_fn, args=(batch_data, labels))
def eval_dataset_fn():
return tf.data.Dataset.from_tensor_slices(
feature_and_label_gen(num_examples=16)).map(
lambda x: (
{"features": feature_preprocess_stage(x["features"])},
label_preprocess_stage(x["label"])
)).shuffle(16).repeat().batch(8)
per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn)
per_worker_eval_iterator = iter(per_worker_eval_dataset)
eval_steps_per_epoch = 2
for _ in range(eval_steps_per_epoch):
coordinator.schedule(eval_step, args=(per_worker_eval_iterator,))
coordinator.join()
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,). Evaluation accuracy: 1.000000
サイドカー評価
もう1つの方法は、サイドカー評価と呼ばれ、チェックポイントを繰り返し読み取り、最新のチェックポイントで評価を実行する専用の評価タスクを作成することです。評価結果に基づいてトレーニングループを変更する必要がない場合は、トレーニングプログラムを早期に終了できます。ただし、評価をトリガーするには、追加の評価タスクと定期的なチェックポイントが必要です。以下は、可能なサイドカー評価ループです。
checkpoint_dir = ...
eval_model = ...
eval_data = ...
checkpoint = tf.train.Checkpoint(model=eval_model)
for latest_checkpoint in tf.train.checkpoints_iterator(
checkpoint_dir):
try:
checkpoint.restore(latest_checkpoint).expect_partial()
except (tf.errors.OpError,) as e:
# checkpoint may be deleted by training when it is about to read it.
continue
# Optionally add callbacks to write summaries.
eval_model.evaluate(eval_data)
# Evaluation finishes when it has evaluated the last epoch.
if latest_checkpoint.endswith('-{}'.format(train_epoches)):
break
実世界のクラスター
実際の実稼働環境では、すべてのタスクをさまざまなマシンのさまざまなプロセスで実行します。各タスクでクラスター情報を構成する最も簡単な方法は、「TF_CONFIG」環境変数を設定し、 TFConfigClusterResolver
を使用して「TF_CONFIG」を解析することです。 「TF_CONFIG」環境変数の一般的な説明については、 分散トレーニングガイドを参照してください。
Kubernetesまたはその他の構成テンプレートを使用してトレーニングタスクを開始する場合、これらのテンプレートですでに「TF_CONFIG」が設定されている可能性があります。
「TF_CONFIG」環境変数を設定します
3つのワーカーと2つのパラメーターサーバーがあるとすると、ワーカー1の「TF_CONFIG」は次のようになります。
os.environ["TF_CONFIG"] = json.dumps({
"cluster": {
"worker": ["host1:port", "host2:port", "host3:port"],
"ps": ["host4:port", "host5:port"],
"chief": ["host6:port"]
},
"task": {"type": "worker", "index": 1}
})
評価者の「TF_CONFIG」は次のようになります。
os.environ["TF_CONFIG"] = json.dumps({
"cluster": {
"evaluator": ["host7:port"]
},
"task": {"type": "evaluator", "index": 0}
})
上記のエバリュエーターの「TF_CONFIG」文字列の「cluster」部分はオプションです。
すべてのタスクに同じバイナリを使用する場合
単一のバイナリを使用してこれらすべてのタスクを実行する場合は、最初にプログラムをさまざまな役割に分岐させる必要があります。
cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if cluster_resolver.task_type in ("worker", "ps"):
# start a TensorFlow server and wait.
elif cluster_resolver.task_type == "evaluator":
# run side-car evaluation
else:
# run the coordinator.
次のコードはTensorFlowサーバーを起動し、待機します。
# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"
cluster_resolver = tf.distribute.cluster_resolver.TF_ConfigClusterResolver()
server = tf.distribute.Server(
cluster_resolver.cluster_spec(),
job_name=cluster_resolver.task_type,
task_index=cluster_resolver.task_id,
protocol=cluster_resolver.rpc_layer or "grpc",
start=True)
server.join()
タスクの失敗の処理
労働者の失敗
上記のように、 ClusterCoordinator
は、ワーカーの障害に対するフォールトトレランスが組み込まれています。ワーカーの回復時に、 create_per_worker_dataset
によって作成され、まだスコープ内にあるデータセットの対応するスライスは、 create_per_worker_dataset
渡された元のdataset_fn
を呼び出すことによって再作成されcreate_per_worker_dataset
。
パラメーターサーバーまたはコーディネーターの障害
ただし、コーディネーターがパラメーターサーバーエラーを検出すると、すぐにUnavailableError
またはAbortedError
します。この場合、コーディネーターを再起動できます。コーディネーター自体も利用できなくなる可能性があります。したがって、トレーニングの進行状況をあまり失わないようにするには、トレーニングを開始する前に、モデル変数を定期的にチェックポイントし、チェックポイントからモデル変数をロードすることが重要です。オプティマイザーがチェックポイントoptimizer.iterations
場合、トレーニングの進行状況は、optimizer.iterationsからおおよそ推測できます。
checkpoint_manager = tf.train.CheckpointManager(
tf.train.Checkpoint(model=model, optimizer=optimizer),
checkpoint_dir,
max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
checkpoint = checkpoint_manager.checkpoint
checkpoint.restore(
checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()
global_steps = int(optimizer.iterations.numpy())
starting_epoch = global_steps // steps_per_epoch
for _ in range(starting_epoch, num_epoches):
for _ in range(steps_per_epoch):
coordinator.schedule(step_fn, args=(per_worker_iterator,))
coordinator.join()
checkpoint_manager.save()
RemoteValue
取得
関数が正常に実行された場合、 RemoteValue
フェッチは成功することが保証されています。これは、現在、関数の実行後すぐに戻り値がコーディネーターにコピーされるためです。コピー中にワーカー障害が発生した場合、関数は別の使用可能なワーカーで再試行されます。したがって、パフォーマンスを最適化する場合は、戻り値なしで関数をスケジュールできます。
エラー報告
コーディネーターは、パラメーターサーバーからのUnavailableError
などのエラー、またはtf.debugging.check_numerics
からのInvalidArgument
などの他のアプリケーションエラーをtf.debugging.check_numerics
すると、エラーを発生させる前に、保留中およびキューに入れられたすべての関数をキャンセルします。対応するRemoteValue
フェッチすると、 CancelledError
ます。
エラーが発生した後、コーディネーターは同じエラーまたはキャンセルされた関数からのエラーを発生させません。
パフォーマンスの向上
ParameterServerStrategy
とClusterResolver
を使用してトレーニングするときにパフォーマンスの問題が発生する場合は、いくつかの理由が考えられClusterResolver
。
一般的な理由の1つは、パラメーターサーバーの負荷が不均衡であり、一部の高負荷のパラメーターサーバーが容量に達していることです。複数の根本原因も考えられます。この問題を軽減するためのいくつかの簡単な方法は次のとおりです。
- 指定経由で大規模なモデル変数をシャード
variable_partitioner
構築する際にParameterServerStrategy
。 - 可能であれば、すべてのパラメーターサーバーで必要なホットスポット変数を1つのステップで作成することは避けてください。たとえば、オプティマイザーで一定の学習率またはサブクラス
tf.keras.optimizers.schedules.LearningRateSchedule
します。これは、デフォルトの動作では、学習率が特定のパラメーターサーバーに配置され、各ステップで他のすべてのパラメーターサーバーによって要求される変数になるためです。 。 - 大きな語彙をシャッフルしてから、Keras前処理レイヤーに渡します。
パフォーマンスの問題のもう1つの考えられる理由は、コーディネーターです。 schedule
/ join
最初の実装はPythonベースであるため、スレッドのオーバーヘッドが発生する可能性があります。また、コーディネーターとワーカーの間の待ち時間が長くなる可能性があります。この場合、複数のステップを1つのtf.function
パックできます。
steps_per_invocation = 10
@tf.function
def step_fn(iterator):
for _ in range(steps_per_invocation):
features, labels = next(iterator)
def replica_fn(features, labels):
...
strategy.run(replica_fn, args=(features, labels))
コーディネーターの最適化を継続し、将来的にほとんどのユーザーが手動でステップをパックする必要がないことを願っています。
さらに、パフォーマンスを向上させるための小さなトリックは、上記のタスク障害の処理のセクションで説明したように、戻り値なしで関数をスケジュールすることです。
既知の制限
既知の制限のほとんどは、上記のセクションで説明されています。要約は次のとおりです。
- フォールトトレランスを適切に機能させるには、コーディネーターを含むすべてのタスクで
os.environment["grpc_fail_fast"]="use_caller"
が必要です。 - GPUワーカーはサポートされていません。
- 同期パラメータサーバートレーニングはサポートされていません。
-
ParameterServerStrategy
は、Kerasのcompile
およびfit
APIでは機能しません。 -
ClusterCoordinator.schedule
は、データセットの訪問保証をサポートしていません。 -
ClusterCoordinator.create_per_worker_dataset
を使用する場合、データセット全体を、渡された関数内に作成する必要があります。 - 通常、最適なパフォーマンスを実現するには、複数のステップを1つの関数にまとめる必要があります。
- シャーディングされた変数を含むtf.saved_model.loadを介して
tf.saved_model.load
をロードすることはサポートされていません。 TensorFlowServingを使用してこのようなsaved_modelをロードすると機能することが期待されます。 - シャードオプティマイザースロット変数を含むチェックポイントを異なる数のシャードにロードすることはサポートされていません。
- コーディネータータスクを再起動せずにパラメーターサーバーの障害から回復することはサポートされていません。