KerasとMultiWorkerMirroredStrategyを使用したカスタムトレーニングループ

TensorFlow.orgで表示GoogleColabで実行GitHubでソースを表示 ノートブックをダウンロード

概要

このチュートリアルでは、MultiWorkerMirroredStrategyを介して配布されるカスタムトレーニングループAPIを使用したマルチワーカートレーニングを示します。そのため、シングルワーカーで実行するように設計されたKerasモデルは、最小限のコード変更で複数のワーカーをシームレスに操作できます。

カスタムトレーニングループを使用してモデルをトレーニングしています。これは、トレーニングの柔軟性と制御性が向上するためです。さらに、モデルとトレーニングループのデバッグが簡単になります。より詳細な情報は、トレーニングループを最初から作成するにあります。

MultiWorkerMirroredStrategymodel.fitで使用する方法をお探しの場合は、代わりにこのチュートリアルを参照してください。

TensorFlowの分散トレーニングガイドは、tf.distribute.StrategyAPI tf.distribute.Strategyより深い理解に関心のある人のためにTensorFlowがサポートする分散戦略の概要について利用できます。

設定

まず、いくつかの必要なインポート。

import json
import os
import sys

TensorFlowをインポートする前に、環境にいくつかの変更を加えます。

すべてのGPUを無効にします。これにより、すべてのワーカーが同じGPUを使用しようとすることによって発生するエラーが防止されます。実際のアプリケーションの場合、各ワーカーは異なるマシン上にあります。

os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

TF_CONFIG環境変数をリセットします。これについては、後で詳しく説明します。

os.environ.pop('TF_CONFIG', None)

現在のディレクトリがPythonのパス上にあることを確認してください。これにより、ノートブックは後で%%writefileによって書き込まれたファイルをインポートできます。

if '.' not in sys.path:
  sys.path.insert(0, '.')

次に、TensorFlowをインポートします。

import tensorflow as tf

データセットとモデルの定義

次に、単純なモデルとデータセットのセットアップを使用してmnist.pyファイルを作成します。このPythonファイルは、このチュートリアルのワーカープロセスによって使用されます。

%%writefile mnist.py

import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the range [0, 255].
  # You need to convert them to float32 with values in the range [0, 1]
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000)
  return train_dataset

def dataset_fn(global_batch_size, input_context):
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)
  dataset = mnist_dataset(batch_size)
  dataset = dataset.shard(input_context.num_input_pipelines,
                          input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  return dataset

def build_cnn_model():
  return tf.keras.Sequential([
      tf.keras.Input(shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
Writing mnist.py

マルチワーカー構成

それでは、マルチワーカートレーニングの世界に入りましょう。 TensorFlowでは、 TF_CONFIG環境変数は、それぞれが異なる役割を持つ可能性のある複数のマシンでのトレーニングに必要です。以下で使用されるTF_CONFIGは、クラスターの一部である各ワーカーのクラスター構成を指定するために使用されるJSON文字列です。これは、 cluster_resolver.TFConfigClusterResolverを使用してクラスターを指定するためのデフォルトの方法ですが、 distribute.cluster_resolverモジュールで使用できる他のオプションがあります。

クラスターについて説明する

構成例は次のとおりです。

tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

これは、JSON文字列としてシリアル化された同じTF_CONFIGです。

json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'

TF_CONFIGには、 clustertaskの2つのコンポーネントがあります。

  • clusterはすべてのワーカーで同じであり、 workerなどのさまざまなタイプのジョブで構成されるdictであるトレーニングクラスターに関する情報を提供します。 MultiWorkerMirroredStrategyを使用したマルチワーカートレーニングでは、通常のworkerが行うことに加えて、チェックポイントの保存やTensorBoardのサマリーファイルの作成など、もう少し責任を負うワーカーが1 workerいます。このようなワーカーはchiefワーカーと呼ばれ、 index 0のworkerがチーフworkerとして任命されるのが通例です(実際、これがtf.distribute.Strategyの実装方法です)。

  • taskは現在のタスクの情報を提供し、ワーカーごとに異なります。そのワーカーのtypeindexを指定します。

この例では、タスクtype"worker"に設定し、タスクindex0に設定します。この機械は最初の労働者であり、主任労働者として任命され、他の機械よりも多くの仕事をします。他のマシンでもTF_CONFIG環境変数を設定する必要があり、 cluster dictは同じである必要がありますが、それらのマシンの役割に応じてタスクtypeまたはタスクindexが異なることに注意してください。

説明のために、このチュートリアルでは、 localhostで2つのワーカーを使用してTF_CONFIGを設定する方法を示します。実際には、ユーザーは外部IPアドレス/ポートに複数のワーカーを作成し、各ワーカーにTF_CONFIGを適切に設定します。

この例では、2つのワーカーを使用します。最初のワーカーのTF_CONFIGは上に示されています。 2番目のワーカーの場合、 tf_config['task']['index']=1を設定します

上記では、 tf_configはPythonの単なるローカル変数です。実際にトレーニングを構成するために使用するには、このディクショナリをJSONとしてシリアル化し、 TF_CONFIG環境変数に配置する必要があります。

ノートブックの環境変数とサブプロセス

サブプロセスは、親から環境変数を継承します。したがって、このjupyter notebookプロセスで環境変数を設定すると、次のようになります。

os.environ['GREETINGS'] = 'Hello TensorFlow!'

サブプロセスから環境変数にアクセスできます。

echo ${GREETINGS}
Hello TensorFlow!

次のセクションでは、これを使用してTF_CONFIGをワーカーサブプロセスに渡します。この方法で実際にジョブを起動することは決してありませんが、このチュートリアルの目的には十分です。最小限のマルチワーカーの例を示すためです。

MultiWorkerMirroredStrategy

モデルをトレーニングするには、 tf.distribute.MultiWorkerMirroredStrategyのインスタンスを使用します。これにより、すべてのワーカーの各デバイスのモデルのレイヤーにあるすべての変数のコピーが作成されます。 tf.distribute.Strategyガイドには、この戦略の詳細が記載されています。

strategy = tf.distribute.MultiWorkerMirroredStrategy()
2022-01-26 05:35:39.353025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO
2022-01-26 05:35:39.353298: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration

tf.distribute.Strategy.scopeを使用して、モデルを構築するときに戦略を使用する必要があることを指定します。これにより、この戦略の「レプリカ間のコンテキスト」になります。つまり、戦略は変数の配置などを制御できます。

import mnist
with strategy.scope():
  # Model building needs to be within `strategy.scope()`.
  multi_worker_model = mnist.build_cnn_model()

ワーカー間でデータを自動シャーディング

マルチワーカートレーニングでは、データセットシャーディングは必ずしも必要ではありませんが、1回限りのセマンティクスが提供されるため、より多くのトレーニングの再現性が向上します。つまり、複数のワーカーでのトレーニングは、1人のワーカーでのトレーニングと同じである必要があります。注:パフォーマンスが影響を受ける場合があります。

参照: distribute_datasets_from_function

per_worker_batch_size = 64
num_workers = len(tf_config['cluster']['worker'])
global_batch_size = per_worker_batch_size * num_workers

with strategy.scope():
  multi_worker_dataset = strategy.distribute_datasets_from_function(
      lambda input_context: mnist.dataset_fn(global_batch_size, input_context))

カスタムトレーニングループを定義し、モデルをトレーニングします

オプティマイザーを指定する

with strategy.scope():
  # The creation of optimizer and train_accuracy will need to be in
  # `strategy.scope()` as well, since they create variables.
  optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.001)
  train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='train_accuracy')

tf.functionを使用してトレーニングステップを定義します

@tf.function
def train_step(iterator):
  """Training step function."""

  def step_fn(inputs):
    """Per-Replica step function."""
    x, y = inputs
    with tf.GradientTape() as tape:
      predictions = multi_worker_model(x, training=True)
      per_batch_loss = tf.keras.losses.SparseCategoricalCrossentropy(
          from_logits=True,
          reduction=tf.keras.losses.Reduction.NONE)(y, predictions)
      loss = tf.nn.compute_average_loss(
          per_batch_loss, global_batch_size=global_batch_size)

    grads = tape.gradient(loss, multi_worker_model.trainable_variables)
    optimizer.apply_gradients(
        zip(grads, multi_worker_model.trainable_variables))
    train_accuracy.update_state(y, predictions)
    return loss

  per_replica_losses = strategy.run(step_fn, args=(next(iterator),))
  return strategy.reduce(
      tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)

チェックポイントの保存と復元

カスタムトレーニングループでのチェックポイントの実装では、ユーザーがkerasコールバックを使用する代わりにそれを処理する必要があります。モデル全体を保存しなくても、モデルの重みを保存して復元できます。

from multiprocessing import util
checkpoint_dir = os.path.join(util.get_temp_dir(), 'ckpt')

def _is_chief(task_type, task_id, cluster_spec):
  return (task_type is None
          or task_type == 'chief'
          or (task_type == 'worker'
              and task_id == 0
              and "chief" not in cluster_spec.as_dict()))

def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id, cluster_spec):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id, cluster_spec):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

ここでは、モデルを追跡する1つのtf.train.Checkpointを作成します。これは、 tf.train.CheckpointManagerによって管理され、最新のチェックポイントのみが保持されます。

epoch = tf.Variable(
    initial_value=tf.constant(0, dtype=tf.dtypes.int64), name='epoch')
step_in_epoch = tf.Variable(
    initial_value=tf.constant(0, dtype=tf.dtypes.int64),
    name='step_in_epoch')
task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)
# We normally don't need to manually instantiate a ClusterSpec, but in this 
# illustrative example we did not set TF_CONFIG before initializing the
# strategy. See the next section for "real-world" usage.
cluster_spec = tf.train.ClusterSpec(tf_config['cluster'])

checkpoint = tf.train.Checkpoint(
    model=multi_worker_model, epoch=epoch, step_in_epoch=step_in_epoch)

write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id,
                                      cluster_spec)
checkpoint_manager = tf.train.CheckpointManager(
    checkpoint, directory=write_checkpoint_dir, max_to_keep=1)

これで、復元する必要があるときに、便利tf.train.latest_checkpoint関数を使用して保存された最新のチェックポイントを見つけることができます。

latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
if latest_checkpoint:
  checkpoint.restore(latest_checkpoint)

チェックポイントを復元した後、カスタムトレーニングループのトレーニングを続行できます。

num_epochs = 3
num_steps_per_epoch = 70

while epoch.numpy() < num_epochs:
  iterator = iter(multi_worker_dataset)
  total_loss = 0.0
  num_batches = 0

  while step_in_epoch.numpy() < num_steps_per_epoch:
    total_loss += train_step(iterator)
    num_batches += 1
    step_in_epoch.assign_add(1)

  train_loss = total_loss / num_batches
  print('Epoch: %d, accuracy: %f, train_loss: %f.'
                %(epoch.numpy(), train_accuracy.result(), train_loss))

  train_accuracy.reset_states()

  # Once the `CheckpointManager` is set up, you're now ready to save, and remove
  # the checkpoints non-chief workers saved.
  checkpoint_manager.save()
  if not _is_chief(task_type, task_id, cluster_spec):
    tf.io.gfile.rmtree(write_checkpoint_dir)

  epoch.assign_add(1)
  step_in_epoch.assign(0)
2022-01-26 05:35:40.200068: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Epoch: 0, accuracy: 0.849107, train_loss: 0.491886.
Epoch: 1, accuracy: 0.937835, train_loss: 0.197650.
Epoch: 2, accuracy: 0.963170, train_loss: 0.129683.

ワーカーでの完全なコード設定

MultiWorkerMirroredStrategyで実際に実行するには、ワーカープロセスを実行し、それらにTF_CONFIGを渡す必要があります。

前に書いたmnist.pyファイルのように、これはこのコラボで前にステップバイステップでウォークスルーしたのと同じコードを含むmain.pyです。ファイルに書き込んでいるので、各ワーカーがそれを実行します。

ファイル: main.py

Writing main.py

トレーニングと評価

現在のディレクトリには、両方のPythonファイルが含まれています。

ls *.py
main.py
mnist.py
プレースホルダー28

したがって、json- TF_CONFIGをシリアル化し、環境変数に追加します。

os.environ['TF_CONFIG'] = json.dumps(tf_config)

これで、 TF_CONFIGを実行してmain.pyを使用するワーカープロセスを起動できます。

# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log
プレースホルダー32

上記のコマンドについて注意すべき点がいくつかあります。

  1. ノートブックの「魔法」である%%bashを使用して、いくつかのbashコマンドを実行します。
  2. このワーカーは終了しないため、 --bgフラグを使用してバックグラウンドでbashプロセスを実行します。開始する前にすべてのワーカーを待ちます。

バックグラウンドのワーカープロセスはこのノートブックに出力を出力しないため、 &>は出力をファイルにリダイレクトし、何が起こったかを確認できます。

したがって、プロセスが起動するまで数秒待ちます。

import time
time.sleep(20)

ここで、これまでにワーカーのログファイルに出力されたものを確認します。

cat job_0.log
2022-01-26 05:35:49.225025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2022-01-26 05:35:49.225297: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration

ログファイルの最後の行には、次のように表示されますStarted server with target: grpc://localhost:12345 。これで最初のワーカーの準備が整い、他のすべてのワーカーの準備が整うのを待っています。

したがって、2番目のワーカーのプロセスのtf_configを更新して、次のようにします。

tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

次に、2番目のワーカーを起動します。すべてのワーカーがアクティブであるため、これによりトレーニングが開始されます(したがって、このプロセスをバックグラウンドで実行する必要はありません)。

python main.py > /dev/null 2>&1

ここで、最初のワーカーによって書き込まれたログを再確認すると、そのモデルのトレーニングに参加したことがわかります。

cat job_0.log
2022-01-26 05:35:49.225025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2022-01-26 05:35:49.225297: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration
2022-01-26 05:36:10.343173: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Epoch: 0, accuracy: 0.832589, train_loss: 0.531260.
Epoch: 1, accuracy: 0.936161, train_loss: 0.214774.
Epoch: 2, accuracy: 0.958594, train_loss: 0.140772.
# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
プレースホルダー40l10n-プレースホルダー
All background processes were killed.

マルチワーカートレーニングの詳細

このチュートリアルでは、マルチワーカー設定のCustom Training Loopワークフローを示しました。他のトピックの詳細な説明は、マルチワーカーセットアップのmodel.fit's guideにあり、CTLに適用できます。

も参照してください

  1. TensorFlowガイドの分散トレーニングでは、利用可能な配布戦略の概要を説明しています。
  2. 公式モデル。その多くは、複数の配布戦略を実行するように構成できます。
  3. ガイドの「パフォーマンス」セクションには、TensorFlowモデルのパフォーマンスを最適化するために使用できる他の戦略とツールに関する情報が記載されています。