![]() | ![]() | ![]() | ![]() |
概要
このチュートリアルでは、MultiWorkerMirroredStrategyを介して配布されるカスタムトレーニングループAPIを使用したマルチワーカートレーニングを示します。そのため、シングルワーカーで実行するように設計されたKerasモデルは、最小限のコード変更で複数のワーカーをシームレスに操作できます。
カスタムトレーニングループを使用してモデルをトレーニングしています。これは、トレーニングの柔軟性と制御性が向上するためです。さらに、モデルとトレーニングループのデバッグが簡単になります。より詳細な情報は、トレーニングループを最初から作成するにあります。
MultiWorkerMirroredStrategy
をmodel.fit
で使用する方法をお探しの場合は、代わりにこのチュートリアルを参照してください。
TensorFlowの分散トレーニングガイドは、tf.distribute.StrategyAPI tf.distribute.Strategy
より深い理解に関心のある人のためにTensorFlowがサポートする分散戦略の概要について利用できます。
設定
まず、いくつかの必要なインポート。
import json
import os
import sys
TensorFlowをインポートする前に、環境にいくつかの変更を加えます。
すべてのGPUを無効にします。これにより、すべてのワーカーが同じGPUを使用しようとすることによって発生するエラーが防止されます。実際のアプリケーションの場合、各ワーカーは異なるマシン上にあります。
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
TF_CONFIG
環境変数をリセットします。これについては、後で詳しく説明します。
os.environ.pop('TF_CONFIG', None)
現在のディレクトリがPythonのパス上にあることを確認してください。これにより、ノートブックは後で%%writefile
によって書き込まれたファイルをインポートできます。
if '.' not in sys.path:
sys.path.insert(0, '.')
次に、TensorFlowをインポートします。
import tensorflow as tf
データセットとモデルの定義
次に、単純なモデルとデータセットのセットアップを使用してmnist.py
ファイルを作成します。このPythonファイルは、このチュートリアルのワーカープロセスによって使用されます。
%%writefile mnist.py
import os
import tensorflow as tf
import numpy as np
def mnist_dataset(batch_size):
(x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
# The `x` arrays are in uint8 and have values in the range [0, 255].
# You need to convert them to float32 with values in the range [0, 1]
x_train = x_train / np.float32(255)
y_train = y_train.astype(np.int64)
train_dataset = tf.data.Dataset.from_tensor_slices(
(x_train, y_train)).shuffle(60000)
return train_dataset
def dataset_fn(global_batch_size, input_context):
batch_size = input_context.get_per_replica_batch_size(global_batch_size)
dataset = mnist_dataset(batch_size)
dataset = dataset.shard(input_context.num_input_pipelines,
input_context.input_pipeline_id)
dataset = dataset.batch(batch_size)
return dataset
def build_cnn_model():
return tf.keras.Sequential([
tf.keras.Input(shape=(28, 28)),
tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
tf.keras.layers.Conv2D(32, 3, activation='relu'),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(10)
])
Writing mnist.py
マルチワーカー構成
それでは、マルチワーカートレーニングの世界に入りましょう。 TensorFlowでは、 TF_CONFIG
環境変数は、それぞれが異なる役割を持つ可能性のある複数のマシンでのトレーニングに必要です。以下で使用されるTF_CONFIG
は、クラスターの一部である各ワーカーのクラスター構成を指定するために使用されるJSON文字列です。これは、 cluster_resolver.TFConfigClusterResolver
を使用してクラスターを指定するためのデフォルトの方法ですが、 distribute.cluster_resolver
モジュールで使用できる他のオプションがあります。
クラスターについて説明する
構成例は次のとおりです。
tf_config = {
'cluster': {
'worker': ['localhost:12345', 'localhost:23456']
},
'task': {'type': 'worker', 'index': 0}
}
これは、JSON文字列としてシリアル化された同じTF_CONFIG
です。
json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'
TF_CONFIG
には、 cluster
とtask
の2つのコンポーネントがあります。
cluster
はすべてのワーカーで同じであり、worker
などのさまざまなタイプのジョブで構成されるdictであるトレーニングクラスターに関する情報を提供します。MultiWorkerMirroredStrategy
を使用したマルチワーカートレーニングでは、通常のworker
が行うことに加えて、チェックポイントの保存やTensorBoardのサマリーファイルの作成など、もう少し責任を負うワーカーが1worker
います。このようなワーカーはchief
ワーカーと呼ばれ、index
0のworker
がチーフworker
として任命されるのが通例です(実際、これがtf.distribute.Strategy
の実装方法です)。task
は現在のタスクの情報を提供し、ワーカーごとに異なります。そのワーカーのtype
とindex
を指定します。
この例では、タスクtype
を"worker"
に設定し、タスクindex
を0
に設定します。この機械は最初の労働者であり、主任労働者として任命され、他の機械よりも多くの仕事をします。他のマシンでもTF_CONFIG
環境変数を設定する必要があり、 cluster
dictは同じである必要がありますが、それらのマシンの役割に応じてタスクtype
またはタスクindex
が異なることに注意してください。
説明のために、このチュートリアルでは、 localhost
で2つのワーカーを使用してTF_CONFIG
を設定する方法を示します。実際には、ユーザーは外部IPアドレス/ポートに複数のワーカーを作成し、各ワーカーにTF_CONFIG
を適切に設定します。
この例では、2つのワーカーを使用します。最初のワーカーのTF_CONFIG
は上に示されています。 2番目のワーカーの場合、 tf_config['task']['index']=1
を設定します
上記では、 tf_config
はPythonの単なるローカル変数です。実際にトレーニングを構成するために使用するには、このディクショナリをJSONとしてシリアル化し、 TF_CONFIG
環境変数に配置する必要があります。
ノートブックの環境変数とサブプロセス
サブプロセスは、親から環境変数を継承します。したがって、このjupyter notebook
プロセスで環境変数を設定すると、次のようになります。
os.environ['GREETINGS'] = 'Hello TensorFlow!'
サブプロセスから環境変数にアクセスできます。
echo ${GREETINGS}
Hello TensorFlow!
次のセクションでは、これを使用してTF_CONFIG
をワーカーサブプロセスに渡します。この方法で実際にジョブを起動することは決してありませんが、このチュートリアルの目的には十分です。最小限のマルチワーカーの例を示すためです。
MultiWorkerMirroredStrategy
モデルをトレーニングするには、 tf.distribute.MultiWorkerMirroredStrategy
のインスタンスを使用します。これにより、すべてのワーカーの各デバイスのモデルのレイヤーにあるすべての変数のコピーが作成されます。 tf.distribute.Strategy
ガイドには、この戦略の詳細が記載されています。
strategy = tf.distribute.MultiWorkerMirroredStrategy()
2022-01-26 05:35:39.353025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO 2022-01-26 05:35:39.353298: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration
tf.distribute.Strategy.scope
を使用して、モデルを構築するときに戦略を使用する必要があることを指定します。これにより、この戦略の「レプリカ間のコンテキスト」になります。つまり、戦略は変数の配置などを制御できます。
import mnist
with strategy.scope():
# Model building needs to be within `strategy.scope()`.
multi_worker_model = mnist.build_cnn_model()
ワーカー間でデータを自動シャーディング
マルチワーカートレーニングでは、データセットシャーディングは必ずしも必要ではありませんが、1回限りのセマンティクスが提供されるため、より多くのトレーニングの再現性が向上します。つまり、複数のワーカーでのトレーニングは、1人のワーカーでのトレーニングと同じである必要があります。注:パフォーマンスが影響を受ける場合があります。
参照: distribute_datasets_from_function
per_worker_batch_size = 64
num_workers = len(tf_config['cluster']['worker'])
global_batch_size = per_worker_batch_size * num_workers
with strategy.scope():
multi_worker_dataset = strategy.distribute_datasets_from_function(
lambda input_context: mnist.dataset_fn(global_batch_size, input_context))
カスタムトレーニングループを定義し、モデルをトレーニングします
オプティマイザーを指定する
with strategy.scope():
# The creation of optimizer and train_accuracy will need to be in
# `strategy.scope()` as well, since they create variables.
optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.001)
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
name='train_accuracy')
tf.function
を使用してトレーニングステップを定義します
@tf.function
def train_step(iterator):
"""Training step function."""
def step_fn(inputs):
"""Per-Replica step function."""
x, y = inputs
with tf.GradientTape() as tape:
predictions = multi_worker_model(x, training=True)
per_batch_loss = tf.keras.losses.SparseCategoricalCrossentropy(
from_logits=True,
reduction=tf.keras.losses.Reduction.NONE)(y, predictions)
loss = tf.nn.compute_average_loss(
per_batch_loss, global_batch_size=global_batch_size)
grads = tape.gradient(loss, multi_worker_model.trainable_variables)
optimizer.apply_gradients(
zip(grads, multi_worker_model.trainable_variables))
train_accuracy.update_state(y, predictions)
return loss
per_replica_losses = strategy.run(step_fn, args=(next(iterator),))
return strategy.reduce(
tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
チェックポイントの保存と復元
カスタムトレーニングループでのチェックポイントの実装では、ユーザーがkerasコールバックを使用する代わりにそれを処理する必要があります。モデル全体を保存しなくても、モデルの重みを保存して復元できます。
from multiprocessing import util
checkpoint_dir = os.path.join(util.get_temp_dir(), 'ckpt')
def _is_chief(task_type, task_id, cluster_spec):
return (task_type is None
or task_type == 'chief'
or (task_type == 'worker'
and task_id == 0
and "chief" not in cluster_spec.as_dict()))
def _get_temp_dir(dirpath, task_id):
base_dirpath = 'workertemp_' + str(task_id)
temp_dir = os.path.join(dirpath, base_dirpath)
tf.io.gfile.makedirs(temp_dir)
return temp_dir
def write_filepath(filepath, task_type, task_id, cluster_spec):
dirpath = os.path.dirname(filepath)
base = os.path.basename(filepath)
if not _is_chief(task_type, task_id, cluster_spec):
dirpath = _get_temp_dir(dirpath, task_id)
return os.path.join(dirpath, base)
ここでは、モデルを追跡する1つのtf.train.Checkpoint
を作成します。これは、 tf.train.CheckpointManager
によって管理され、最新のチェックポイントのみが保持されます。
epoch = tf.Variable(
initial_value=tf.constant(0, dtype=tf.dtypes.int64), name='epoch')
step_in_epoch = tf.Variable(
initial_value=tf.constant(0, dtype=tf.dtypes.int64),
name='step_in_epoch')
task_type, task_id = (strategy.cluster_resolver.task_type,
strategy.cluster_resolver.task_id)
# We normally don't need to manually instantiate a ClusterSpec, but in this
# illustrative example we did not set TF_CONFIG before initializing the
# strategy. See the next section for "real-world" usage.
cluster_spec = tf.train.ClusterSpec(tf_config['cluster'])
checkpoint = tf.train.Checkpoint(
model=multi_worker_model, epoch=epoch, step_in_epoch=step_in_epoch)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id,
cluster_spec)
checkpoint_manager = tf.train.CheckpointManager(
checkpoint, directory=write_checkpoint_dir, max_to_keep=1)
これで、復元する必要があるときに、便利tf.train.latest_checkpoint
関数を使用して保存された最新のチェックポイントを見つけることができます。
latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
if latest_checkpoint:
checkpoint.restore(latest_checkpoint)
チェックポイントを復元した後、カスタムトレーニングループのトレーニングを続行できます。
num_epochs = 3
num_steps_per_epoch = 70
while epoch.numpy() < num_epochs:
iterator = iter(multi_worker_dataset)
total_loss = 0.0
num_batches = 0
while step_in_epoch.numpy() < num_steps_per_epoch:
total_loss += train_step(iterator)
num_batches += 1
step_in_epoch.assign_add(1)
train_loss = total_loss / num_batches
print('Epoch: %d, accuracy: %f, train_loss: %f.'
%(epoch.numpy(), train_accuracy.result(), train_loss))
train_accuracy.reset_states()
# Once the `CheckpointManager` is set up, you're now ready to save, and remove
# the checkpoints non-chief workers saved.
checkpoint_manager.save()
if not _is_chief(task_type, task_id, cluster_spec):
tf.io.gfile.rmtree(write_checkpoint_dir)
epoch.assign_add(1)
step_in_epoch.assign(0)
2022-01-26 05:35:40.200068: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations. Epoch: 0, accuracy: 0.849107, train_loss: 0.491886. Epoch: 1, accuracy: 0.937835, train_loss: 0.197650. Epoch: 2, accuracy: 0.963170, train_loss: 0.129683.
ワーカーでの完全なコード設定
MultiWorkerMirroredStrategy
で実際に実行するには、ワーカープロセスを実行し、それらにTF_CONFIG
を渡す必要があります。
前に書いたmnist.py
ファイルのように、これはこのコラボで前にステップバイステップでウォークスルーしたのと同じコードを含むmain.py
です。ファイルに書き込んでいるので、各ワーカーがそれを実行します。
ファイル: main.py
%%writefile main.py
import os
import json
import tensorflow as tf
import mnist
from multiprocessing import util
per_worker_batch_size = 64
tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])
global_batch_size = per_worker_batch_size * num_workers
num_epochs = 3
num_steps_per_epoch=70
# Checkpoint saving and restoring
def _is_chief(task_type, task_id, cluster_spec):
return (task_type is None
or task_type == 'chief'
or (task_type == 'worker'
and task_id == 0
and 'chief' not in cluster_spec.as_dict()))
def _get_temp_dir(dirpath, task_id):
base_dirpath = 'workertemp_' + str(task_id)
temp_dir = os.path.join(dirpath, base_dirpath)
tf.io.gfile.makedirs(temp_dir)
return temp_dir
def write_filepath(filepath, task_type, task_id, cluster_spec):
dirpath = os.path.dirname(filepath)
base = os.path.basename(filepath)
if not _is_chief(task_type, task_id, cluster_spec):
dirpath = _get_temp_dir(dirpath, task_id)
return os.path.join(dirpath, base)
checkpoint_dir = os.path.join(util.get_temp_dir(), 'ckpt')
# Define Strategy
strategy = tf.distribute.MultiWorkerMirroredStrategy()
with strategy.scope():
# Model building/compiling need to be within `strategy.scope()`.
multi_worker_model = mnist.build_cnn_model()
multi_worker_dataset = strategy.distribute_datasets_from_function(
lambda input_context: mnist.dataset_fn(global_batch_size, input_context))
optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.001)
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
name='train_accuracy')
@tf.function
def train_step(iterator):
"""Training step function."""
def step_fn(inputs):
"""Per-Replica step function."""
x, y = inputs
with tf.GradientTape() as tape:
predictions = multi_worker_model(x, training=True)
per_batch_loss = tf.keras.losses.SparseCategoricalCrossentropy(
from_logits=True,
reduction=tf.keras.losses.Reduction.NONE)(y, predictions)
loss = tf.nn.compute_average_loss(
per_batch_loss, global_batch_size=global_batch_size)
grads = tape.gradient(loss, multi_worker_model.trainable_variables)
optimizer.apply_gradients(
zip(grads, multi_worker_model.trainable_variables))
train_accuracy.update_state(y, predictions)
return loss
per_replica_losses = strategy.run(step_fn, args=(next(iterator),))
return strategy.reduce(
tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
epoch = tf.Variable(
initial_value=tf.constant(0, dtype=tf.dtypes.int64), name='epoch')
step_in_epoch = tf.Variable(
initial_value=tf.constant(0, dtype=tf.dtypes.int64),
name='step_in_epoch')
task_type, task_id, cluster_spec = (strategy.cluster_resolver.task_type,
strategy.cluster_resolver.task_id,
strategy.cluster_resolver.cluster_spec())
checkpoint = tf.train.Checkpoint(
model=multi_worker_model, epoch=epoch, step_in_epoch=step_in_epoch)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id,
cluster_spec)
checkpoint_manager = tf.train.CheckpointManager(
checkpoint, directory=write_checkpoint_dir, max_to_keep=1)
# Restoring the checkpoint
latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
if latest_checkpoint:
checkpoint.restore(latest_checkpoint)
# Resume our CTL training
while epoch.numpy() < num_epochs:
iterator = iter(multi_worker_dataset)
total_loss = 0.0
num_batches = 0
while step_in_epoch.numpy() < num_steps_per_epoch:
total_loss += train_step(iterator)
num_batches += 1
step_in_epoch.assign_add(1)
train_loss = total_loss / num_batches
print('Epoch: %d, accuracy: %f, train_loss: %f.'
%(epoch.numpy(), train_accuracy.result(), train_loss))
train_accuracy.reset_states()
checkpoint_manager.save()
if not _is_chief(task_type, task_id, cluster_spec):
tf.io.gfile.rmtree(write_checkpoint_dir)
epoch.assign_add(1)
step_in_epoch.assign(0)
Writing main.py
トレーニングと評価
現在のディレクトリには、両方のPythonファイルが含まれています。
ls *.py
main.py mnist.pyプレースホルダー28
したがって、json- TF_CONFIG
をシリアル化し、環境変数に追加します。
os.environ['TF_CONFIG'] = json.dumps(tf_config)
これで、 TF_CONFIG
を実行してmain.py
を使用するワーカープロセスを起動できます。
# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log
プレースホルダー32上記のコマンドについて注意すべき点がいくつかあります。
- ノートブックの「魔法」である
%%bash
を使用して、いくつかのbashコマンドを実行します。 - このワーカーは終了しないため、
--bg
フラグを使用してバックグラウンドでbash
プロセスを実行します。開始する前にすべてのワーカーを待ちます。
バックグラウンドのワーカープロセスはこのノートブックに出力を出力しないため、 &>
は出力をファイルにリダイレクトし、何が起こったかを確認できます。
したがって、プロセスが起動するまで数秒待ちます。
import time
time.sleep(20)
ここで、これまでにワーカーのログファイルに出力されたものを確認します。
cat job_0.log
2022-01-26 05:35:49.225025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected 2022-01-26 05:35:49.225297: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration
ログファイルの最後の行には、次のように表示されますStarted server with target: grpc://localhost:12345
。これで最初のワーカーの準備が整い、他のすべてのワーカーの準備が整うのを待っています。
したがって、2番目のワーカーのプロセスのtf_config
を更新して、次のようにします。
tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)
次に、2番目のワーカーを起動します。すべてのワーカーがアクティブであるため、これによりトレーニングが開始されます(したがって、このプロセスをバックグラウンドで実行する必要はありません)。
python main.py > /dev/null 2>&1
ここで、最初のワーカーによって書き込まれたログを再確認すると、そのモデルのトレーニングに参加したことがわかります。
cat job_0.log
2022-01-26 05:35:49.225025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected 2022-01-26 05:35:49.225297: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration 2022-01-26 05:36:10.343173: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations. Epoch: 0, accuracy: 0.832589, train_loss: 0.531260. Epoch: 1, accuracy: 0.936161, train_loss: 0.214774. Epoch: 2, accuracy: 0.958594, train_loss: 0.140772.
# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
プレースホルダー40l10n-プレースホルダーAll background processes were killed.
マルチワーカートレーニングの詳細
このチュートリアルでは、マルチワーカー設定のCustom Training Loop
ワークフローを示しました。他のトピックの詳細な説明は、マルチワーカーセットアップのmodel.fit's guide
にあり、CTLに適用できます。
も参照してください
- TensorFlowガイドの分散トレーニングでは、利用可能な配布戦略の概要を説明しています。
- 公式モデル。その多くは、複数の配布戦略を実行するように構成できます。
- ガイドの「パフォーマンス」セクションには、TensorFlowモデルのパフォーマンスを最適化するために使用できる他の戦略とツールに関する情報が記載されています。