Keras を使ったマルチワーカートレーニング

TensorFlow.org で実行 Google Colab で実行 GitHub でソースを表示 ノートブックをダウンロード

概要

このチュートリアルでは、tf.distribute.Strategy API、具体的にはtf.distribute.MultiWorkerMirroredStrategy クラスを使用して、Keras モデルと Model.fit API によるマルチワーカー分散型トレーニングを実演します。このストラテジーの助けにより、シングルワーカーで実行するように設計された Keras モデルは、最小限のコード変更で複数のワーカーでシームレスに機能することができます。

tf.distribute.Strategy API をさらに学習したい方は、TensorFlow での分散トレーニングガイドで TensorFlow がサポートする分散ストラテジーの概要をご覧ください。

Keras とカスタムループで MultiWorkerMirroredStrategy を使用する方法を学習する場合は、Keras と MultiWorkerMirroredStrategy によるカスタムトレーニングループをご覧ください。

このチュートリアルの目的は、2 つのワーカーを使った最小限のマルチワーカーの例を紹介することです。

セットアップ

まず、必要なものをインポートします。

import json
import os
import sys

TensorFlow をインポートする前に、環境にいくつかの変更を加えます。

  1. すべての GPU を無効にします。 これにより、すべてのワーカーが同じ GPU を使用しようとすることによって発生するエラーが防止されます。 実際のアプリケーションでは、各ワーカーは異なるマシン上にあります。
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
  1. TF_CONFIG 環境変数をリセットします(これについては後で詳しく説明します)。
os.environ.pop('TF_CONFIG', None)
  1. 現在のディレクトリが Python のパス上にあることを確認してください。これにより、ノートブックは %%writefile で書き込まれたファイルを後でインポートできるようになります。
if '.' not in sys.path:
  sys.path.insert(0, '.')

次に TensorFlow をインポートします。

import tensorflow as tf
2022-08-08 21:39:57.915090: E tensorflow/stream_executor/cuda/cuda_blas.cc:2981] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2022-08-08 21:39:58.690088: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer.so.7'; dlerror: libnvrtc.so.11.1: cannot open shared object file: No such file or directory
2022-08-08 21:39:58.690349: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer_plugin.so.7'; dlerror: libnvrtc.so.11.1: cannot open shared object file: No such file or directory
2022-08-08 21:39:58.690362: W tensorflow/compiler/tf2tensorrt/utils/py_utils.cc:38] TF-TRT Warning: Cannot dlopen some TensorRT libraries. If you would like to use Nvidia GPU with TensorRT, please make sure the missing libraries mentioned above are installed properly.

データセットとモデルの定義

次に、単純なモデルとデータセットの設定を使用してmnist.pyファイルを作成します。 この Python ファイルは、このチュートリアルのワーカープロセスによって使用されます。

%%writefile mnist_setup.py

import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the [0, 255] range.
  # You need to convert them to float32 with values in the [0, 1] range.
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
  return train_dataset

def build_and_compile_cnn_model():
  model = tf.keras.Sequential([
      tf.keras.layers.InputLayer(input_shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
      metrics=['accuracy'])
  return model
Writing mnist_setup.py

シングルワーカーでのモデルのトレーニング

まず、少数のエポックでモデルをトレーニングし、シングルワーカーで結果を観察して、すべてが正しく機能していることを確認します。エポックが進むにつれ、損失が下降し、精度が 1.0 に近づくはずです。

import mnist_setup

batch_size = 64
single_worker_dataset = mnist_setup.mnist_dataset(batch_size)
single_worker_model = mnist_setup.build_and_compile_cnn_model()
single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)
2022-08-08 21:40:00.102740: E tensorflow/stream_executor/cuda/cuda_driver.cc:265] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
Epoch 1/3
70/70 [==============================] - 1s 7ms/step - loss: 2.2785 - accuracy: 0.1308
Epoch 2/3
70/70 [==============================] - 1s 7ms/step - loss: 2.2134 - accuracy: 0.2924
Epoch 3/3
70/70 [==============================] - 0s 7ms/step - loss: 2.1456 - accuracy: 0.4295
<keras.callbacks.History at 0x7f3dd2cbbc10>

マルチワーカー構成

では、マルチワーカートレーニングの世界を覗いてみましょう。

ジョブとタスクのクラスタ

TensorFlow では、分散トレーニングには、いくつかのジョブが含まれる 'cluster' があり、各ジョブには 1 つ以上の 'task' が含まれることがあります。

それぞれに異なる役割をもつ複数のマシンでトレーニングするには TF_CONFIG 環境変数が必要です。TF_CONFIG は JSON 文字列で、クラスタの一部である各ワーカーのクラスタ構成を指定するために使用されます。

TF_CONFIG 変数には、'cluster''task' の 2 つのコンポーネントがあります。

  • 'cluster' はすべてのワーカーに共通し、トレーニングクラスタに関する情報を、'worker' または 'chief' などのさまざまなジョブの種類で構成される dict として提供します。

    • tf.distribute.MultiWorkerMirroredStrategy によるマルチワーカートレーニングでは通常、'worker' が通常行うことのほかにチェックポイントの保存や TensorBoard 用のサマリーファイルの書き込みといった役割を果たす 1 つの 'worker' があります。こういった 'worker' はチーフワーカー(ジョブ名は 'chief')と呼ばれます。
    • 通例、'chief' には 'index' 0 が指定されます(実際、tf.distribute.Strategy はそのように実装されています)。
  • 'task' は現在のタスクの情報を提供し、ワーカーごとに異なります。タスクはそのワーカーの 'type''index' を指定します。

以下に構成例を示します。

tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

これは、JSON 文字列としてシリアル化された同じTF_CONFIGです。

json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'

tf_config は Python 単なるローカル変数です。トレーニング構成で使用するには、この dict を JSON としてシリアル化し、TF_CONFIG 環境変数に配置する必要があります。

上記の構成例では、タスク 'type''worker' に設定し、タスク 'index'0 に設定しています。そのため、このマシンが最初のワーカーとなります。'chief' ワーカーとして指定されることになるため、ほかのワーカーよりも多くの作業を行います。

注意: 他のマシンにも TF_CONFIG 環境変数を設定し、同じ 'cluster' dict が必要となりますが、それらのマシンの役割に応じた異なるタスク 'type' またはタスク 'index' が必要となります。

説明の目的により、このチュートリアルではある localhost の 2 つのワーカーでどのようにTF_CONFIG 変数をセットアップできるかを示しています。

実際には、外部 IP aドレス/ポートに複数のワーカーを作成して、ワーカーごとに適宜 TF_CONFIG 変数を設定する必要があります。

このチュートリアルでは、2 つのワーカーを使用します。

  • 最初の ('chief') ワーカーの TF_CONFIG は上記に示す通りです。
  • 2 つ目のワーカーでは、tf_config['task']['index']=1 を設定します。

ノートブックの環境変数とサブプロセス

サブプロセスは、親プロセスの環境変数を継承します。

たとえば、この Jupyter ノートブックのプロセスでは、環境変数を次のように設定できます。

os.environ['GREETINGS'] = 'Hello TensorFlow!'

すると、サブプロセスからその環境変数にアクセスできます。

echo ${GREETINGS}
Hello TensorFlow!

次のセクションでは、似たような方法で、TF_CONFIG をワーカーのサブプロセスに渡します。実際に行う場合はこのようにしてジョブを起動することはありませんが、この例では十分です。

適切なストラテジーを選択する

TensorFlow では、以下の 2 つの分散型トレーニングがあります。

  • 同期トレーニング: トレーニングのステップがワーカーとレプリカ間で同期されます。
  • 非同期トレーニング: トレーニングステップが厳密に同期されません(パラメータサーバートレーニングなど)。

このチュートリアルでは、tf.distribute.MultiWorkerMirroredStrategy のインスタンスを使用して、同期マルチワーカートレーニングを実行する方法を示します。

MultiWorkerMirroredStrategyは、すべてのワーカーの各デバイスにあるモデルのレイヤーにすべての変数のコピーを作成します。集合通信に使用する TensorFlow 演算子CollectiveOpsを使用して勾配を集め、変数の同期を維持します。このストラテジーの詳細は、tf.distribute.Strategyガイドで説明されています。

strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO

注意: MultiWorkerMirroredStrategyが呼び出されると、TF_CONFIGが解析され、TensorFlow の GRPC サーバーが開始します。そのため、TF_CONFIG環境変数は、tf.distribute.Strategyインスタンスが作成される前に設定しておく必要があります。TF_CONFIGはまだ設定されていないため、上記の戦略は実質的にシングルワーカーのトレーニングです。

MultiWorkerMirroredStrategy は、tf.distribute.experimental.CommunicationOptions パラメータを介して複数の実装を提供します。1) RING は gRPC をクロスホスト通信レイヤーとして使用して、リング状の集合体を実装します。2) NCCLNVIDIA Collective Communication Library を使用して集合体を実装します。3) AUTO はその選択をランタイムに任せます。集合体の最適な実装は GPU の数と種類、およびクラスタ内のネットワーク相互接続によって異なります。

モデルのトレーニング

tf.kerastf.distribute.Strategy API を統合したため、トレーニングをマルチワーカーに分散するには、モデルビルディングとmodel.compile()呼び出しをstrategy.scope()内に収めるように変更することだけが必要となりました。この分散ストラテジーのスコープは、どこでどのように変数が作成されるかを指定し、MultiWorkerMirroredStrategyの場合、作成される変数はMirroredVariableで、各ワーカーに複製されます。

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist_setup.build_and_compile_cnn_model()

注意: 現在のところ、MultiWorkerMirroredStrategy には、TensorFlow 演算子をストラテジーのインスタンスが作成された後に作成する必要があるという制限があります。RuntimeError: Collective ops must be configured at program startup が表示される場合は、プログラムのはじめに MultiWorkerMirroredStrategy のインスタンスを作成するようにし、演算子を作成するコードをストラテジーがインスタンス化される後に配置するようにしてください。

MultiWorkerMirroredStrategyで実際に実行するには、ワーカープロセスを実行し、TF_CONFIGをそれらに渡す必要があります。

前に記述したmnist_setup.pyファイルと同様に、各ワーカーが実行するmain.pyは次のとおりです。

%%writefile main.py

import os
import json

import tensorflow as tf
import mnist_setup

per_worker_batch_size = 64
tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])

strategy = tf.distribute.MultiWorkerMirroredStrategy()

global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist_setup.mnist_dataset(global_batch_size)

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist_setup.build_and_compile_cnn_model()


multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)
Writing main.py

上記のコードスニペットでは、Dataset.batchに渡されるglobal_batch_sizeper_worker_batch_size * num_workersに設定されていることに注意してください。これにより、ワーカーの数に関係なく、各ワーカーがper_worker_batch_sizeの例のバッチを処理するようになります。

現在のディレクトリには、両方の Python ファイルが含まれています。

ls *.py
main.py
mnist_setup.py

json はTF_CONFIG}をシリアル化し、環境変数に追加します。

os.environ['TF_CONFIG'] = json.dumps(tf_config)

これで、main.pyを実行し、TF_CONFIGを使用するワーカープロセスを起動できます。

# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log

上記のコマンドについて注意すべき点がいくつかあります。

  1. ノートブック 「マジック」 である%%bashを使用して、いくつかの bash コマンドを実行します。
  2. このワーカーは終了しないため、--bgフラグを使用してbashプロセスをバックグラウンドで実行します。 このワーカーは始める前にすべてのワーカーを待ちます。

バックグラウンドのワーカープロセスはこのノートブックに出力を出力しないため、&> で出力をファイルにリダイレクトし、何が起こったかを検査できます。

プロセスが開始するまで数秒待ちます。

import time
time.sleep(10)

これまでにワーカーのログファイルに出力されたものを検査します。

cat job_0.log
2022-08-08 21:40:03.186987: E tensorflow/stream_executor/cuda/cuda_blas.cc:2981] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2022-08-08 21:40:03.925790: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer.so.7'; dlerror: libnvrtc.so.11.1: cannot open shared object file: No such file or directory
2022-08-08 21:40:03.926023: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer_plugin.so.7'; dlerror: libnvrtc.so.11.1: cannot open shared object file: No such file or directory
2022-08-08 21:40:03.926040: W tensorflow/compiler/tf2tensorrt/utils/py_utils.cc:38] TF-TRT Warning: Cannot dlopen some TensorRT libraries. If you would like to use Nvidia GPU with TensorRT, please make sure the missing libraries mentioned above are installed properly.
2022-08-08 21:40:04.976492: E tensorflow/stream_executor/cuda/cuda_driver.cc:265] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected

ログファイルの最後の行は Started server with target: grpc://localhost:12345であるはずです。最初のワーカーは準備が整い、他のすべてのワーカーの準備が整うのを待っています。

2番目のワーカーのプロセスを始めるようにtf_configを更新します。

tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

2番目のワーカーを起動します。すべてのワーカーがアクティブであるため、これによりトレーニングが開始されます(したがって、このプロセスをバックグラウンドで実行する必要はありません)。

python main.py
2022-08-08 21:40:13.289998: E tensorflow/stream_executor/cuda/cuda_blas.cc:2981] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2022-08-08 21:40:14.038354: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer.so.7'; dlerror: libnvrtc.so.11.1: cannot open shared object file: No such file or directory
2022-08-08 21:40:14.038591: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer_plugin.so.7'; dlerror: libnvrtc.so.11.1: cannot open shared object file: No such file or directory
2022-08-08 21:40:14.038605: W tensorflow/compiler/tf2tensorrt/utils/py_utils.cc:38] TF-TRT Warning: Cannot dlopen some TensorRT libraries. If you would like to use Nvidia GPU with TensorRT, please make sure the missing libraries mentioned above are installed properly.
2022-08-08 21:40:15.067417: E tensorflow/stream_executor/cuda/cuda_driver.cc:265] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2022-08-08 21:40:16.172745: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:0"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
attr {
  key: "replicate_on_split"
  value {
    b: false
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}

2022-08-08 21:40:16.427032: W tensorflow/core/framework/dataset.cc:769] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Epoch 1/3
70/70 [==============================] - 6s 40ms/step - loss: 2.2697 - accuracy: 0.2098
Epoch 2/3
70/70 [==============================] - 3s 39ms/step - loss: 2.1980 - accuracy: 0.4135
Epoch 3/3
70/70 [==============================] - 3s 40ms/step - loss: 2.1140 - accuracy: 0.5693

最初のワーカーにより書き込まれたログを再確認すると、そのモデルのトレーニングに参加していることがわかります。

cat job_0.log
2022-08-08 21:40:03.186987: E tensorflow/stream_executor/cuda/cuda_blas.cc:2981] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2022-08-08 21:40:03.925790: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer.so.7'; dlerror: libnvrtc.so.11.1: cannot open shared object file: No such file or directory
2022-08-08 21:40:03.926023: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer_plugin.so.7'; dlerror: libnvrtc.so.11.1: cannot open shared object file: No such file or directory
2022-08-08 21:40:03.926040: W tensorflow/compiler/tf2tensorrt/utils/py_utils.cc:38] TF-TRT Warning: Cannot dlopen some TensorRT libraries. If you would like to use Nvidia GPU with TensorRT, please make sure the missing libraries mentioned above are installed properly.
2022-08-08 21:40:04.976492: E tensorflow/stream_executor/cuda/cuda_driver.cc:265] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2022-08-08 21:40:16.169920: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:0"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
attr {
  key: "replicate_on_split"
  value {
    b: false
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}

2022-08-08 21:40:16.424122: W tensorflow/core/framework/dataset.cc:769] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Epoch 1/3
70/70 [==============================] - 6s 40ms/step - loss: 2.2697 - accuracy: 0.2098
Epoch 2/3
70/70 [==============================] - 3s 39ms/step - loss: 2.1980 - accuracy: 0.4135
Epoch 3/3
70/70 [==============================] - 3s 40ms/step - loss: 2.1140 - accuracy: 0.5693

当然ながら、これはこのチュートリアルの最初に実行したテストよりも実行速度が劣っています

単一のマシンで複数のワーカーを実行しても、オーバーヘッドが追加されるだけです。

ここではトレーニングの時間を改善することではなく、マルチワーカートレーニングの例を紹介することを目的としています。

# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.

マルチワーカートレーニングの詳細

ここまで、基本的なマルチワーカーのセットアップの実行を見てきました。

このチュートリアルの残りの部分では、実際のユースケースで役立つ重要な要素について詳しく説明します。

データセットのシャーディング

マルチワーカートレーニングでは、コンバージェンスとパフォーマンスを確保するために、データセットのシャーディングが必要です。

前のセクションの例は、tf.distribute.Strategy API により提供されるデフォルトの自動シャーディングに依存しています。tf.data.experimental.DistributeOptionstf.data.experimental.AutoShardPolicy を設定することで、シャーディングを制御できます。

自動シャーディングの詳細については、分散入力ガイドをご覧ください。

自動シャーディングをオフにして、各レプリカがすべての例を処理する方法の簡単な例を次に示します(推奨されません)。

options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF

global_batch_size = 64
multi_worker_dataset = mnist_setup.mnist_dataset(batch_size=64)
dataset_no_auto_shard = multi_worker_dataset.with_options(options)

評価

validation_dataModel.fit に渡すと、エポックごとにトレーニングと評価が交互に行われるようになります。validation_data を取る評価は同じセットのワーカー間で分散されているため、評価結果はすべてのワーカーが使用できるように集計されます。

トレーニングと同様に、評価データセットもファイルレベルで自動的にシャーディングされます。評価データセットにグローバルバッチサイズを設定し、validation_steps を設定する必要があります。

評価ではデータセットを繰り返すことも推奨されます。

Alternatively, you can also create another task that periodically reads checkpoints and runs the evaluation. This is what Estimator does. But this is not a recommended way to perform evaluation and thus its details are omitted.

性能

これで、MultiWorkerMirroredStrategy を使ってマルチワーカーで実行するようにセットアップされた Keras モデルの準備ができました。

マルチワーカートレーニングのパフォーマンスを調整するには、次を行うことができます。

  • tf.distribute.MultiWorkerMirroredStrategy には複数の集合体通信実装が用意されています。

    • RING は、クロスホスト通信レイヤーとして、gRPC を使用したリング状の集合体を実装します。
    • NCCLNVIDIA Collective Communication Library を使用して集合体を実装します。
    • AUTO は、選択をランタイムに任せます。

    集合体の最適な実装は、GPU の数、GPU の種類、およびクラスタ内のネットワーク相互接続によって異なります。自動選択をオーバーライドするには、MultiWorkerMirroredStrategy のコンストラクタの communication_options パラメータを以下のようにして指定します。

    communication_options=tf.distribute.experimental.CommunicationOptions(implementation=tf.distribute.experimental.CollectiveCommunication.NCCL)
    
  • 可能であれば、変数を tf.float にキャストします。

    • 公式の ResNet モデルには、どのようにしてこれを行うかのが示されています。

フォールトトレランス

同期トレーニングでは、ワーカーが 1 つでも失敗し、障害復旧の仕組みが存在しない場合、クラスタは失敗します。

Keras をtf.distribute.Strategyで使用する場合、ワーカーが停止した場合や不安定である際に、フォールトトラレンスが機能するというメリットがあります。この機能は、指定された分散ファイルシステムにトレーニングの状態を保存するため、失敗、または、中断されたインスタンスを再開する場合に、トレーニングの状態が復旧されます。

When a worker becomes unavailable, other workers will fail (possibly after a timeout). In such cases, the unavailable worker needs to be restarted, as well as other workers that have failed.

注意: 以前は、ModelCheckpoint コールバックには、マルチワーカートレーニングに失敗したジョブを再開したときに、トレーニングの状態を復元するメカニズムがありました。新たに導入される BackupAndRestore コールバックでは、一貫したエクスペリエンスを提供するために、シングルワーカートレーニングにもこのサポートが追加され、既存の ModelCheckpoint コールバックからフォールトトレランス機能が削除されました。今後、この動作に依存するアプリケーションは、新しい BackupAndRestore コールバックに移行する必要があります。

ModelCheckpoint コールバック

ModelCheckpointコールバックは、フォールトトレランス機能を提供しなくなりました。代わりに BackupAndRestoreコールバックを使用してください。

ModelCheckpointコールバックを使用してチェックポイントを保存することは、依然として可能です。ただし、これを使用する場合、トレーニングが中断されるか、問題なく終了した場合、チェックポイントからトレーニングを続行するには、手動でモデルを読み込まなければなりません。

オプションで、ユーザーはModelCheckpointコールバックの外部でモデル/重みを保存および復元することを選択できます。

モデルの保存と読み込み

model.save または tf.saved_model.save を使用してモデルを保存するには、ワーカーごとに異なる保存先が必要となります。

  • チーフワーカー以外のワーカーの場合、モデルを一時ディレクトリに保存する必要があります。
  • チーフワーカーの場合、指定されたモデルのディレクトリに保存する必要があります。

ワーカーの一時ディレクトリは、複数のワーカーが同じ場所に書き込もうとしてエラーが発生しないように、一意のディレクトリである必要があります。

すべてのディレクトリに保存されるモデルは同一のものであり、復元やサービングで参照されるのは一般的に、チーフワーカーが保存したモデルです。

トレーニングが完了したらワーカーが作成した一時ディレクトリを削除するクリーンアップロジックを用意しておく必要があります。

チーフとワーカーを同時に保存する必要があるのは、チェックポイント中に変数を集計する可能性があり、チーフとワーカーの両方が allreduce 通信プロトコルに参加する必要があるためです。しかしながら、チーフとワーカーを同じモデルディレクトリに保存すると競合が発生し、エラーとなります。

MultiWorkerMirroredStrategy を使用すると、プログラムはワーカーごとに実行され、現在のワーカーがチーフであるかを知る際には、task_typetask_id の属性があるクラスタレゾルバオブジェクトが利用されます。

  • task_type から、現在のジョブが何であるか('worker' など)を知ることができます。
  • task_id から、ワーカーの ID を得られます。
  • task_id == 0 のワーカーはチーフワーカーです。

以下のコードスニペットの write_filepath 関数は、書き込みのファイルパスを指定します。このパスはワーカーの task_id によって異なります。

  • チーフワーカー(task_id == 0)の場合は、元のファイルパスに書き込みます。
  • それ以外のワーカーの場合は、書き込むディレクトリパスに task_id を指定して、一時ディレクトリ(temp_dir)を作成します。
model_path = '/tmp/keras-model'

def _is_chief(task_type, task_id):
  # Note: there are two possible `TF_CONFIG` configuration.
  #   1) In addition to `worker` tasks, a `chief` task type is use;
  #      in this case, this function should be modified to
  #      `return task_type == 'chief'`.
  #   2) Only `worker` task type is used; in this case, worker 0 is
  #      regarded as the chief. The implementation demonstrated here
  #      is for this case.
  # For the purpose of this Colab section, the `task_type is None` case
  # is added because it is effectively run with only a single worker.
  return (task_type == 'worker' and task_id == 0) or task_type is None

def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)
write_model_path = write_filepath(model_path, task_type, task_id)

これで、保存の準備ができました。

multi_worker_model.save(write_model_path)
WARNING:absl:Found untraced functions such as _jit_compiled_convolution_op while saving (showing 1 of 1). These functions will not be directly callable after loading.
INFO:tensorflow:Assets written to: /tmp/keras-model/assets
INFO:tensorflow:Assets written to: /tmp/keras-model/assets

前述したように、後でモデルを読み込む場合、チーフが保存した場所にあるモデルのみを使用するべきなので、非チーフワーカーが保存した一時的なモデルは削除します。

if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(os.path.dirname(write_model_path))

読み込む際に便利な tf.keras.models.load_model API を使用して、以降の作業に続けることにします。

ここでは、単一ワーカーのみを使用してトレーニングを読み込んで続けると仮定します。この場合、別の strategy.scope() 内で tf.keras.models.load_model を呼び出しません(前に定義したように、strategy = tf.distribute.MultiWorkerMirroredStrategy() です)。

loaded_model = tf.keras.models.load_model(model_path)

# Now that the model is restored, and can continue with the training.
loaded_model.fit(single_worker_dataset, epochs=2, steps_per_epoch=20)
Epoch 1/2
20/20 [==============================] - 1s 8ms/step - loss: 2.3173 - accuracy: 0.2086
Epoch 2/2
20/20 [==============================] - 0s 7ms/step - loss: 2.2970 - accuracy: 0.2391
<keras.callbacks.History at 0x7f3dd1bb13d0>

チェックポイントの保存と復元

一方、チェックポイントを作成すれば、モデルの重みを保存し、モデル全体を保存せずともそれらを復元することが可能です。

ここでは、モデルをトラッキングする tf.train.Checkpoint を 1 つ作成します。これは tf.train.CheckpointManager によって管理されるため、最新のチェックポイントのみが保存されます。

checkpoint_dir = '/tmp/ckpt'

checkpoint = tf.train.Checkpoint(model=multi_worker_model)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id)
checkpoint_manager = tf.train.CheckpointManager(
    checkpoint, directory=write_checkpoint_dir, max_to_keep=1)

CheckpointManager の準備ができたら、チェックポイントを保存し、チーフ以外のワーカーが保存したチェックポイントを削除します。

checkpoint_manager.save()
if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(write_checkpoint_dir)

これで、復元する必要があれば、便利なtf.train.latest_checkpoint関数を使用して、保存された最新のチェックポイントを見つけることができるようになりました。チェックポイントが復元されると、トレーニングを続行することができます。

latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
checkpoint.restore(latest_checkpoint)
multi_worker_model.fit(multi_worker_dataset, epochs=2, steps_per_epoch=20)
2022-08-08 21:40:30.732346: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:5"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
attr {
  key: "replicate_on_split"
  value {
    b: false
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}
Epoch 1/2
2022-08-08 21:40:30.970308: W tensorflow/core/framework/dataset.cc:769] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
20/20 [==============================] - 3s 7ms/step - loss: 2.3103 - accuracy: 0.2562
Epoch 2/2
20/20 [==============================] - 0s 8ms/step - loss: 2.2975 - accuracy: 0.2828
<keras.callbacks.History at 0x7f3e7ffd0d30>

BackupAndRestore コールバック

tf.keras.callbacks.BackupAndRestore コールバックはフォールトトレランス機能を提供します。この機能はモデルと現在のエポック番号を一時チェックポイントファイルをbackup_dir引数でバックアップし、BackupAndRestoreでコールバックします。これは、各エポックの終了時に実行されます。

ジョブが中断されて再開されると、コールバックは最新のチェックポイントを復元するため、中断されたエポックの始めからトレーニングを続行することができます。未完了のエポックで中断前に実行された部分のトレーニングは破棄されるため、モデルの最終状態に影響することはありません。

これを使用するには、Model.fit 呼び出し時に、 Model.fit のインスタンスを指定します。

MultiWorkerMirroredStrategy では、ワーカーが中断されると、そのワーカーが再開するまでクラスタ全体が一時停止されます。そのワーカーが再開するとほかのワーカーも再開します。中断したワーカーがクラスタに参加し直すと、各ワーカーは以前に保存されたチェックポイントファイルを読み取って以前の状態を復元するため、クラスタの同期状態が戻ります。そして、トレーニングが続行されます。

BackupAndRestore コールバックは、CheckpointManager を使用して、トレーニングの状態を保存・復元します。これには、既存のチェックポイントを最新のものと併せて追跡するチェックポイントと呼ばれるファイルが生成されます。このため、ほかのチェックポイントの保存に backup_dir を再利用しないようにし、名前の競合を回避する必要があります。

現在、BackupAndRestore コールバックは、ストラテジーなしのシングルワーカートレーニング(MirroredStrategy)と MultiWorkerMirroredStrategy によるマルチワーカートレーニングをサポートしています。

以下に、マルチワーカートレーニングとシングルワーカートレーニングの 2 つの例を示します。

# Multi-worker training with `MultiWorkerMirroredStrategy`
# and the `BackupAndRestore` callback.

callbacks = [tf.keras.callbacks.BackupAndRestore(backup_dir='/tmp/backup')]
with strategy.scope():
  multi_worker_model = mnist_setup.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
                       epochs=3,
                       steps_per_epoch=70,
                       callbacks=callbacks)
2022-08-08 21:40:33.977542: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:5"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
attr {
  key: "replicate_on_split"
  value {
    b: false
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}
Epoch 1/3
70/70 [==============================] - 3s 8ms/step - loss: 2.2729 - accuracy: 0.1612
Epoch 2/3
70/70 [==============================] - 1s 8ms/step - loss: 2.2193 - accuracy: 0.3165
Epoch 3/3
70/70 [==============================] - 1s 8ms/step - loss: 2.1604 - accuracy: 0.4259
<keras.callbacks.History at 0x7f3dd17aa670>

BackupAndRestore に指定した backup_dir のディレクトリを検査すると、一時的に生成されたチェックポイントファイルがいくつかあることに気づくでしょう。これらのファイルは、以前に失われたインスタンスの復元に必要なもので、トレーニングが正常に終了した時点で、Model.fit の最後にライブラリによって削除されます。

注意: 現在のところ、BackupAndRestore コールバックは Eager モードのみをサポートしています。Graph モードでは、前述した保存/復元モデルを使用し、Model.fitinitial_epoch を指定することで実施することを検討してください。

追加リソース

  1. TensorFlow での分散型トレーニングガイドでは、利用可能な分散ストラテジーの概要が説明されています。
  2. Keras によるカスタムトレーニングループと MultiWorkerMirroredStrategy のチュートリアルでは、Keras とカスタムトレーニングループでMultiWorkerMirroredStrategy を使用する方法が説明されています。
  3. 公式モデルをご覧ください。この多くは、複数の分散ストラテジーを実行するように構成できます。
  4. tf.function を使ったパフォーマンスの改善ガイドでは、その他のストラテジーや、TensorFlow モデルのパフォーマンスを最適化するために使用できる TensorFlow Profiler といったツールに関する情報が提供されています。