KerasとMultiWorkerMirroredStrategyを使用したカスタムトレーニングループ

TensorFlow.orgで表示 GoogleColabで実行 GitHubでソースを表示ノートブックをダウンロードする

概要概要

このチュートリアルでは、MultiWorkerMirroredStrategyを介して配布されるカスタムトレーニングループAPIを使用したマルチワーカートレーニングを示します。これにより、シングルワーカーで実行するように設計されたKerasモデルは、最小限のコード変更で複数のワーカーでシームレスに作業できます。

カスタムトレーニングループを使用してモデルをトレーニングしています。これは、トレーニングの柔軟性と制御性が向上するためです。さらに、モデルとトレーニングループのデバッグが簡単になります。より詳細な情報は、トレーニングループを最初から作成するにあります。

keras model.fitMultiWorkerMirroredStrategyを使用する方法をお探しの場合は、代わりにこのチュートリアルを参照してください

TensorFlowの中に分散トレーニングガイドでは、流通戦略のより深い理解に興味のある人のためのTensorFlow支持体の概観のために利用可能であるtf.distribute.StrategyのAPI。

セットアップ

まず、いくつかの必要なインポート。

import json
import os
import sys

TensorFlowをインポートする前に、環境にいくつかの変更を加えます。

すべてのGPUを無効にします。これにより、すべてのワーカーが同じGPUを使用しようとすることによって発生するエラーが防止されます。実際のアプリケーションでは、各ワーカーは異なるマシン上にあります。

os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

TF_CONFIG環境変数をリセットします。これについては、後で詳しく説明します。

os.environ.pop('TF_CONFIG', None)

現在のディレクトリがPythonのパス上にあることを確認してください。これにより、ノートブックは後で%%writefileによって書き込まれたファイルをインポートできます。

if '.' not in sys.path:
  sys.path.insert(0, '.')

次に、TensorFlowをインポートします。

import tensorflow as tf

データセットとモデルの定義

次に、単純なモデルとデータセットのセットアップを使用してmnist.pyファイルを作成します。このPythonファイルは、このチュートリアルのワーカープロセスによって使用されます。

%%writefile mnist.py

import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the range [0, 255].
  # You need to convert them to float32 with values in the range [0, 1]
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000)
  return train_dataset

def dataset_fn(global_batch_size, input_context):
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)
  dataset = mnist_dataset(batch_size)
  dataset = dataset.shard(input_context.num_input_pipelines,
                          input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  return dataset

def build_cnn_model():
  return tf.keras.Sequential([
      tf.keras.Input(shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
Writing mnist.py

マルチワーカー構成

それでは、マルチワーカートレーニングの世界に入りましょう。 TensorFlowでは、 TF_CONFIG環境変数は、それぞれが異なる役割を持つ可能性のある複数のマシンでのトレーニングに必要です。以下で使用されるTF_CONFIGは、クラスターの一部である各ワーカーのクラスター構成を指定するために使用されるJSON文字列です。これは、 cluster_resolver.TFConfigClusterResolverを使用してクラスターを指定するためのデフォルトの方法ですが、 distribute.cluster_resolverモジュールで使用できる他のオプションがあります。

クラスターについて説明する

構成例は次のとおりです。

tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

これは、JSON文字列としてTF_CONFIG化された同じTF_CONFIGです。

json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'

TF_CONFIG clustertask 2つのコンポーネントがありtask

  • clusterはすべてのワーカーで同じであり、 workerなどのさまざまなタイプのジョブで構成されるdictであるトレーニングクラスターに関する情報を提供します。 MultiWorkerMirroredStrategyマルチワーカートレーニングでは、通常のworkerが行うことに加えて、チェックポイントの保存やTensorBoardの要約ファイルの作成などのもう少し責任を負うworkerが通常1 workerいます。このような労働者は、次のように呼ばれているchief労働者、そしてあることが通例であるworkerindex 0がチーフに任命されたworker (実際には、これはどのようにあるtf.distribute.Strategy実装されています)。

  • taskは現在のタスクの情報を提供し、ワーカーごとに異なります。そのワーカーのtypeindexを指定します。

この例では、タスクtype"worker"し、タスクindex0ます。この機械は最初の労働者であり、主任労働者として任命され、他の機械よりも多くの仕事をします。他のマシンでもTF_CONFIG環境変数を設定する必要があり、 cluster dictは同じである必要がありclusterが、それらのマシンの役割に応じてタスクtypeまたはタスクindexが異なることに注意してください。

説明のために、このチュートリアルでは、 localhostで2つのワーカーをTF_CONFIGしてTF_CONFIGを設定する方法を示します。実際には、ユーザーは外部IPアドレス/ポートに複数のワーカーを作成し、各ワーカーにTF_CONFIGを適切に設定します。

この例では、2つのワーカーを使用します。最初のワーカーのTF_CONFIGは上に示されています。 2番目のワーカーの場合、 tf_config['task']['index']=1

上記のtf_configは、Pythonの単なるローカル変数です。実際にトレーニングを構成するために使用するには、このディクショナリをJSONとしてシリアル化し、 TF_CONFIG環境変数に配置する必要があります。

ノートブックの環境変数とサブプロセス

サブプロセスは、親から環境変数を継承します。したがって、このjupyter notebookプロセスで環境変数を設定すると、次のようになります。

os.environ['GREETINGS'] = 'Hello TensorFlow!'

サブプロセスから環境変数にアクセスできます。

echo ${GREETINGS}
Hello TensorFlow!

次のセクションでは、これを使用してTF_CONFIGをワーカーサブプロセスに渡します。この方法で実際にジョブを起動することは決してありませんが、このチュートリアルの目的には十分です。最小限のマルチワーカーの例を示すためです。

MultiWorkerMirroredStrategy

モデルをトレーニングするには、 tf.distribute.MultiWorkerMirroredStrategyインスタンスを使用します。これにより、すべてのワーカーの各デバイスのモデルのレイヤーにあるすべての変数のコピーが作成されます。 tf.distribute.Strategyガイドには、この戦略の詳細が記載されています。

strategy = tf.distribute.MultiWorkerMirroredStrategy()
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO

tf.distribute.Strategy.scopeを使用して、モデルを構築するときに戦略を使用する必要があることを指定します。これにより、この戦略の「レプリカ間のコンテキスト」になります。つまり、戦略は変数の配置などを制御できるようになります。

import mnist
with strategy.scope():
  # Model building needs to be within `strategy.scope()`.
  multi_worker_model = mnist.build_cnn_model()
WARNING:tensorflow:Please add `keras.layers.InputLayer` instead of `keras.Input` to Sequential model. `keras.Input` is intended to be used by Functional model.

ワーカー間でデータを自動シャーディング

マルチワーカートレーニングでは、データセットシャーディングは必ずしも必要ではありませんが、セマンティックが1回だけ提供されるため、トレーニングの再現性が高まります。つまり、複数のワーカーでのトレーニングは、1人のワーカーでのトレーニングと同じである必要があります。注:パフォーマンスが影響を受ける場合があります。

参照: distribute_datasets_from_function

per_worker_batch_size = 64
num_workers = len(tf_config['cluster']['worker'])
global_batch_size = per_worker_batch_size * num_workers

with strategy.scope():
  multi_worker_dataset = strategy.distribute_datasets_from_function(
      lambda input_context: mnist.dataset_fn(global_batch_size, input_context))

カスタムトレーニングループを定義し、モデルをトレーニングします

オプティマイザを指定します

with strategy.scope():
  # The creation of optimizer and train_accuracy will need to be in
  # `strategy.scope()` as well, since they create variables.
  optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.001)
  train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='train_accuracy')

tf.functionを使用してトレーニングステップを定義しtf.function

@tf.function
def train_step(iterator):
  """Training step function."""

  def step_fn(inputs):
    """Per-Replica step function."""
    x, y = inputs
    with tf.GradientTape() as tape:
      predictions = multi_worker_model(x, training=True)
      per_batch_loss = tf.keras.losses.SparseCategoricalCrossentropy(
          from_logits=True,
          reduction=tf.keras.losses.Reduction.NONE)(y, predictions)
      loss = tf.nn.compute_average_loss(
          per_batch_loss, global_batch_size=global_batch_size)

    grads = tape.gradient(loss, multi_worker_model.trainable_variables)
    optimizer.apply_gradients(
        zip(grads, multi_worker_model.trainable_variables))
    train_accuracy.update_state(y, predictions)
    return loss

  per_replica_losses = strategy.run(step_fn, args=(next(iterator),))
  return strategy.reduce(
      tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)

チェックポイントの保存と復元

カスタムトレーニングループでのチェックポイントの実装では、ユーザーがkerasコールバックを使用する代わりにそれを処理する必要があります。モデル全体を保存しなくても、モデルの重みを保存して復元できます。

from multiprocessing import util
checkpoint_dir = os.path.join(util.get_temp_dir(), 'ckpt')

def _is_chief(task_type, task_id):
  return task_type is None or task_type == 'chief' or (task_type == 'worker' and
                                                       task_id == 0)
def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

ここでは、モデルを追跡する1つのtf.train.Checkpointを作成します。これは、 tf.train.CheckpointManagerによって管理され、最新のチェックポイントのみが保持されるようにします。

epoch = tf.Variable(
    initial_value=tf.constant(0, dtype=tf.dtypes.int64), name='epoch')
step_in_epoch = tf.Variable(
    initial_value=tf.constant(0, dtype=tf.dtypes.int64),
    name='step_in_epoch')
task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)

checkpoint = tf.train.Checkpoint(
    model=multi_worker_model, epoch=epoch, step_in_epoch=step_in_epoch)

write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id)
checkpoint_manager = tf.train.CheckpointManager(
    checkpoint, directory=write_checkpoint_dir, max_to_keep=1)

これで、復元する必要があるときに、便利なtf.train.latest_checkpoint関数を使用して保存された最新のチェックポイントを見つけることができます。

latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
if latest_checkpoint:
  checkpoint.restore(latest_checkpoint)

チェックポイントを復元した後、カスタムトレーニングループのトレーニングを続行できます。

num_epochs = 3
num_steps_per_epoch = 70

while epoch.numpy() < num_epochs:
  iterator = iter(multi_worker_dataset)
  total_loss = 0.0
  num_batches = 0

  while step_in_epoch.numpy() < num_steps_per_epoch:
    total_loss += train_step(iterator)
    num_batches += 1
    step_in_epoch.assign_add(1)

  train_loss = total_loss / num_batches
  print('Epoch: %d, accuracy: %f, train_loss: %f.'
                %(epoch.numpy(), train_accuracy.result(), train_loss))

  train_accuracy.reset_states()

  # Once the `CheckpointManager` is set up, you're now ready to save, and remove
  # the checkpoints non-chief workers saved.
  checkpoint_manager.save()
  if not _is_chief(task_type, task_id):
    tf.io.gfile.rmtree(write_checkpoint_dir)

  epoch.assign_add(1)
  step_in_epoch.assign(0)
Epoch: 0, accuracy: 0.819531, train_loss: 0.561418.
Epoch: 1, accuracy: 0.938616, train_loss: 0.206848.
Epoch: 2, accuracy: 0.954799, train_loss: 0.146723.

ワーカーでの完全なコード設定

MultiWorkerMirroredStrategy実際に実行するには、ワーカープロセスを実行し、それらにTF_CONFIGを渡す必要があります。

前に書いたmnist.pyファイルのように、これはこのコラボで前にステップバイステップでmain.pyしたのと同じコードを含むmain.pyです。ファイルに書き込んでいるので、各ワーカーがそれを実行します。

ファイル: main.py

Writing main.py

トレーニングと評価

現在のディレクトリには、両方のPythonファイルが含まれています。

ls *.py
main.py
mnist.py

したがって、json- TF_CONFIGをシリアル化し、環境変数に追加します。

os.environ['TF_CONFIG'] = json.dumps(tf_config)

さて、あなたが実行され、ワーカープロセスを起動することができmain.pyし、使用TF_CONFIG

# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log

上記のコマンドについて注意すべき点がいくつかあります。

  1. ノートブックの「魔法」である%%bashを使用して、いくつかのbashコマンドを実行します。
  2. このワーカーは終了しないため、 --bgフラグを使用してバックグラウンドでbashプロセスを実行します。それが始まる前にそれはすべての労働者を待ちます。

バックグラウンドのワーカープロセスはこのノートブックに出力を出力しないため、 &>は出力をファイルにリダイレクトし、何が起こったかを確認できます。

したがって、プロセスが開始するまで数秒待ちます。

import time
time.sleep(20)

ここで、これまでにワーカーのログファイルに出力されたものを確認します。

cat job_0.log
2021-06-16 18:42:16.160677: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcudart.so.11.0
2021-06-16 18:42:17.271468: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcuda.so.1
2021-06-16 18:42:18.215075: E tensorflow/stream_executor/cuda/cuda_driver.cc:328] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-06-16 18:42:18.215137: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-884136203
2021-06-16 18:42:18.215146: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-884136203
2021-06-16 18:42:18.215282: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 465.27.0
2021-06-16 18:42:18.215316: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 465.27.0
2021-06-16 18:42:18.215323: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 465.27.0
2021-06-16 18:42:18.216043: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-06-16 18:42:18.220983: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:301] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-06-16 18:42:18.221439: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:411] Started server with target: grpc://localhost:12345

ログファイルの最後の行には、次のように表示されます。 Started server with target: grpc://localhost:12345 。これで最初のワーカーの準備が整い、他のすべてのワーカーの準備が整うのを待っています。

したがって、2番目のワーカーのプロセスがtf_configするようにtf_configを更新します。

tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

次に、2番目のワーカーを起動します。すべてのワーカーがアクティブであるため、これによりトレーニングが開始されます(したがって、このプロセスをバックグラウンドで実行する必要はありません)。

python main.py > /dev/null 2>&1

ここで、最初のワーカーによって書き込まれたログを再確認すると、そのモデルのトレーニングに参加していることがわかります。

cat job_0.log
2021-06-16 18:42:16.160677: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcudart.so.11.0
2021-06-16 18:42:17.271468: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcuda.so.1
2021-06-16 18:42:18.215075: E tensorflow/stream_executor/cuda/cuda_driver.cc:328] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-06-16 18:42:18.215137: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-884136203
2021-06-16 18:42:18.215146: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-884136203
2021-06-16 18:42:18.215282: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 465.27.0
2021-06-16 18:42:18.215316: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 465.27.0
2021-06-16 18:42:18.215323: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 465.27.0
2021-06-16 18:42:18.216043: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-06-16 18:42:18.220983: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:301] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-06-16 18:42:18.221439: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:411] Started server with target: grpc://localhost:12345
WARNING:tensorflow:Please add `keras.layers.InputLayer` instead of `keras.Input` to Sequential model. `keras.Input` is intended to be used by Functional model.
2021-06-16 18:42:39.265636: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:176] None of the MLIR Optimization Passes are enabled (registered 2)
2021-06-16 18:42:39.266014: I tensorflow/core/platform/profile_utils/cpu_utils.cc:114] CPU Frequency: 2000179999 Hz
Epoch: 0, accuracy: 0.836384, train_loss: 0.517218.
Epoch: 1, accuracy: 0.937277, train_loss: 0.200661.
Epoch: 2, accuracy: 0.961161, train_loss: 0.137424.
# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.

マルチワーカートレーニングの詳細

このチュートリアルでは、マルチワーカー設定のCustom Training Loopワークフローを示しました。他のトピックの詳細な説明は、マルチワーカーセットアップのmodel.fit's guideあり、CTLに適用できます。

も参照してください

  1. TensorFlowガイドの分散トレーニングでは、利用可能な配布戦略の概要を説明しています。
  2. 公式モデル。その多くは、複数の配布戦略を実行するように構成できます。
  3. ガイドの「パフォーマンス」セクションには、TensorFlowモデルのパフォーマンスを最適化するために使用できる他の戦略とツールに関する情報が記載されています。