Keras を使ったマルチワーカートレーニング

TensorFlow.org で表示 Google Colabで実行 GitHubでソースを表示 ノートブックをダウンロード

概要

このチュートリアルでは、tf.distribute.Strategy API を使用して、Keras モデルと Model.fit API によるマルチワーカー分散型トレーニングを実演します。このストラテジーにより、単一のワーカーで実行するように設計された Keras モデルは、最小限のコード変更で複数のワーカーでシームレスに機能することができます。

To learn how to use the MultiWorkerMirroredStrategy with Keras and a custom training loop, refer to Custom training loop with Keras and MultiWorkerMirroredStrategy.

このチュートリアルには、デモ用に 2 つのワーカーを含む最小限のマルチワーカーの例が含まれています。

適切なストラテジーを選択する

始める前に、アクセラレータとトレーニングに tf.distribute.MultiWorkerMirroredStrategy が適切な選択であることを確認してください。これらは、データ並列処理を使用してトレーニングを分散する 2 つの一般的な方法です。

TPU を使用しないマルチワーカーの同期トレーニングには、tf.distribute.experimental.MultiWorkerMirroredStrategy を使用します。これは、すべてのワーカーの各デバイスにあるモデルのレイヤーにすべての変数のコピーを作成します。集合通信に使用する TensorFlow 演算子 CollectiveOps を使用して勾配を集め、変数の同期を維持します。集合実装オプションについては、tf.distribute.experimental.CommunicationOptions パラメータを確認してください。

tf.distribute.Strategy API の概要については、TensorFlow での分散トレーニングを参照してください。

セットアップ

まず、必要なものをインポートします。

import json
import os
import sys

TensorFlow をインポートする前に、環境にいくつかの変更を加えます。

  • 実際のアプリケーションでは、各ワーカーは異なるマシン上にあります。このチュートリアルでは、すべてのワーカーがこのマシンで実行されます。そのため、すべての GPU を無効にして、すべてのワーカーが同じ GPU を使用しようとすることによって発生するエラーを防ぎます。
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
  • TF_CONFIG 環境変数をリセットします(これについては後で詳しく説明します)。
os.environ.pop('TF_CONFIG', None)
  • 現在のディレクトリが Python のパス上にあることを確認してください。これにより、ノートブックは %%writefile で書き込まれたファイルを後でインポートできるようになります。
if '.' not in sys.path:
  sys.path.insert(0, '.')

tf-nightly をインストールします。TensorFlow 2.10 から tf.keras.callbacks.BackupAndRestoresave_freq 引数を使用した特定のステップでのチェックポイント保存頻度が導入されます。

pip install tf-nightly

最後に、TensorFlow をインポートします。

import tensorflow as tf
2022-12-14 20:00:31.684068: E tensorflow/tsl/lib/monitoring/collection_registry.cc:81] Cannot register 2 metrics with the same name: /tensorflow/core/bfc_allocator_delay

データセットとモデルの定義

次に、単純なモデルとデータセットの設定を使用して mnist.py ファイルを作成します。この Python ファイルは、このチュートリアルのワーカープロセスによって使用されます。

%%writefile mnist_setup.py

import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the [0, 255] range.
  # You need to convert them to float32 with values in the [0, 1] range.
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
  return train_dataset

def build_and_compile_cnn_model():
  model = tf.keras.Sequential([
      tf.keras.layers.InputLayer(input_shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
      metrics=['accuracy'])
  return model
Writing mnist_setup.py

シングルワーカーでのモデルのトレーニング

まず、少数のエポックでモデルをトレーニングし、シングルワーカーで結果を観察して、すべてが正しく機能していることを確認します。エポックが進むにつれ、損失が下降し、精度が 1.0 に近づくはずです。

import mnist_setup

batch_size = 64
single_worker_dataset = mnist_setup.mnist_dataset(batch_size)
single_worker_model = mnist_setup.build_and_compile_cnn_model()
single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
11490434/11490434 [==============================] - 0s 0us/step
2022-12-14 20:00:34.473671: E tensorflow/compiler/xla/stream_executor/cuda/cuda_driver.cc:267] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
Epoch 1/3
70/70 [==============================] - 1s 7ms/step - loss: 2.2720 - accuracy: 0.2181
Epoch 2/3
70/70 [==============================] - 0s 6ms/step - loss: 2.1978 - accuracy: 0.4471
Epoch 3/3
70/70 [==============================] - 0s 6ms/step - loss: 2.1061 - accuracy: 0.5902
<keras.callbacks.History at 0x7fc1c84ef6a0>

マルチワーカー構成

では、マルチワーカートレーニングの世界を覗いてみましょう。

ジョブとタスクのクラスタ

TensorFlow では、分散トレーニングには、いくつかのジョブが含まれる 'cluster' があり、各ジョブには 1 つ以上の 'task' が含まれることがあります。

それぞれに異なる役割をもつ複数のマシンでトレーニングするには TF_CONFIG 環境変数が必要です。TF_CONFIG は JSON 文字列で、クラスタの一部である各ワーカーのクラスタ構成を指定するために使用されます。

TF_CONFIG 変数には、'cluster''task' の 2 つのコンポーネントがあります。

  • 'cluster' はすべてのワーカーに共通し、トレーニングクラスタに関する情報を、'worker' または 'chief' などのさまざまなジョブの種類で構成される dict として提供します。

    • tf.distribute.MultiWorkerMirroredStrategy によるマルチワーカートレーニングでは通常、'worker' が通常行うことのほかにチェックポイントの保存や TensorBoard 用のサマリーファイルの書き込みといった役割を果たす 1 つの 'worker' があります。こういった 'worker' はチーフワーカー (ジョブ名は 'chief') と呼ばれます。
    • 通常、'index' 0 を持つワーカーが 'chief' になります。
  • 'task' は現在のタスクの情報を提供し、ワーカーごとに異なります。タスクはそのワーカーの 'type''index' を指定します。

以下に構成例を示します。

tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

tf_config は Python の単なるローカル変数であることに注意してください。トレーニング構成に使用するには、JSON としてシリアル化し、'TF_CONFIG' 環境変数に配置します。

json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'

上記の構成例では、タスク 'type''worker' に設定し、タスク 'index'0 に設定しています。そのため、このマシンが最初のワーカーとなり、'chief' ワーカーとして指定されます。

注意: 他のマシンにも TF_CONFIG 環境変数を設定し、同じ 'cluster' dict が必要となりますが、それらのマシンの役割に応じた異なるタスク 'type' またはタスク 'index' が必要となります。

実際には、外部 IP アドレス/ポートに複数のワーカーを作成し、それに応じて各ワーカーに TF_CONFIG 変数を設定します。このチュートリアルでは、デモとして localhost で 2 つのワーカーを使用して TF_CONFIG 変数を設定する方法を示します。

  • 最初の ('chief') ワーカーの TF_CONFIG は上記に示す通りです。
  • 2 つ目のワーカーでは、tf_config['task']['index']=1 を設定します。

ノートブックの環境変数とサブプロセス

サブプロセスは、親から環境変数を継承します。したがって、この Jupyter Notebook プロセスで環境変数を設定すると、次のようになります。

os.environ['GREETINGS'] = 'Hello TensorFlow!'

すると、サブプロセスからその環境変数にアクセスできます。

echo ${GREETINGS}
Hello TensorFlow!

次のセクションでは、これを使用して TF_CONFIG をワーカーサブプロセスに渡します。この方法で実際にジョブを起動することは決してありませんが、このチュートリアルで最小限のマルチワーカーの例を示すためには十分です。

モデルのトレーニング

モデルをトレーニングするには、まず tf.distribute.MultiWorkerMirroredStrategy のインスタンスを作成します。

strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO

注意: MultiWorkerMirroredStrategy が呼び出されると、TF_CONFIG が解析され、TensorFlow の GRPC サーバーが開始します。そのため、TF_CONFIG 環境変数は、tf.distribute.Strategy インスタンスが作成される前に設定しておく必要があります。TF_CONFIG はまだ設定されていないため、上記のストラテジーは実質的にシングルワーカーのトレーニングです。

tf.kerastf.distribute.Strategy API を統合したため、トレーニングをマルチワーカーに分散するには、モデルビルディングと model.compile() 呼び出しを strategy.scope() 内に収めるように変更することだけが必要となりました。この分散ストラテジーのスコープは、どこでどのように変数が作成されるかを指定し、MultiWorkerMirroredStrategy の場合、作成される変数は MirroredVariable で、各ワーカーに複製されます。

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist_setup.build_and_compile_cnn_model()

注意: 現在のところ、MultiWorkerMirroredStrategy には、TensorFlow 演算子をストラテジーのインスタンスが作成された後に作成する必要があるという制限があります。RuntimeError: Collective ops must be configured at program startup が表示される場合は、プログラムのはじめに MultiWorkerMirroredStrategy のインスタンスを作成するようにし、演算子を作成するコードをストラテジーがインスタンス化される後に配置するようにしてください。

MultiWorkerMirroredStrategyで実際に実行するには、ワーカープロセスを実行し、TF_CONFIGをそれらに渡す必要があります。

前に記述したmnist_setup.pyファイルと同様に、各ワーカーが実行するmain.pyは次のとおりです。

%%writefile main.py

import os
import json

import tensorflow as tf
import mnist_setup

per_worker_batch_size = 64
tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])

strategy = tf.distribute.MultiWorkerMirroredStrategy()

global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist_setup.mnist_dataset(global_batch_size)

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist_setup.build_and_compile_cnn_model()


multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)
Writing main.py

上記のコードスニペットでは、Dataset.batchに渡されるglobal_batch_sizeper_worker_batch_size * num_workersに設定されていることに注意してください。これにより、ワーカーの数に関係なく、各ワーカーがper_worker_batch_sizeの例のバッチを処理するようになります。

現在のディレクトリには、両方の Python ファイルが含まれています。

ls *.py
main.py
mnist_setup.py

JSON は TF_CONFIG をシリアル化し、環境変数に追加します。

os.environ['TF_CONFIG'] = json.dumps(tf_config)

これで、main.pyを実行し、TF_CONFIGを使用するワーカープロセスを起動できます。

# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log

上記のコマンドについて注意すべき点がいくつかあります。

  1. ノートブック 「マジック」 である%%bashを使用して、いくつかの bash コマンドを実行します。
  2. このワーカーは終了しないため、--bgフラグを使用してbashプロセスをバックグラウンドで実行します。 このワーカーは始める前にすべてのワーカーを待ちます。

バックグラウンドのワーカープロセスはこのノートブックに出力を出力しないため、&> で出力をファイルにリダイレクトし、何が起こったかを検査できます。

プロセスが開始するまで数秒待ちます。

import time
time.sleep(10)

これまでにワーカーのログファイルに出力されたものを検査します。

cat job_0.log
2022-12-14 20:00:37.557684: E tensorflow/tsl/lib/monitoring/collection_registry.cc:81] Cannot register 2 metrics with the same name: /tensorflow/core/bfc_allocator_delay
2022-12-14 20:00:39.352865: E tensorflow/compiler/xla/stream_executor/cuda/cuda_driver.cc:267] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected

ログファイルの最後の行は Started server with target: grpc://localhost:12345 であるはずです。最初のワーカーは準備が整い、他のすべてのワーカーの準備が整うのを待っています。

2 番目のワーカーのプロセスを始めるように tf_config を更新します。

tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

2番目のワーカーを起動します。すべてのワーカーがアクティブであるため、これによりトレーニングが開始されます(したがって、このプロセスをバックグラウンドで実行する必要はありません)。

python main.py
2022-12-14 20:00:47.666158: E tensorflow/tsl/lib/monitoring/collection_registry.cc:81] Cannot register 2 metrics with the same name: /tensorflow/core/bfc_allocator_delay
2022-12-14 20:00:49.465809: E tensorflow/compiler/xla/stream_executor/cuda/cuda_driver.cc:267] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2022-12-14 20:00:50.424484: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:784] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:0"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
attr {
  key: "replicate_on_split"
  value {
    b: false
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}

2022-12-14 20:00:50.702584: W tensorflow/core/framework/dataset.cc:807] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Epoch 1/3
70/70 [==============================] - 6s 42ms/step - loss: 2.2913 - accuracy: 0.1316
Epoch 2/3
70/70 [==============================] - 3s 41ms/step - loss: 2.2551 - accuracy: 0.2313
Epoch 3/3
70/70 [==============================] - 3s 40ms/step - loss: 2.2144 - accuracy: 0.3311

最初のワーカーにより書き込まれたログを再確認すると、そのモデルのトレーニングに参加していることがわかります。

cat job_0.log
2022-12-14 20:00:37.557684: E tensorflow/tsl/lib/monitoring/collection_registry.cc:81] Cannot register 2 metrics with the same name: /tensorflow/core/bfc_allocator_delay
2022-12-14 20:00:39.352865: E tensorflow/compiler/xla/stream_executor/cuda/cuda_driver.cc:267] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2022-12-14 20:00:50.422449: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:784] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:0"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
attr {
  key: "replicate_on_split"
  value {
    b: false
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}

2022-12-14 20:00:50.700553: W tensorflow/core/framework/dataset.cc:807] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Epoch 1/3
70/70 [==============================] - 6s 42ms/step - loss: 2.2913 - accuracy: 0.1316
Epoch 2/3
70/70 [==============================] - 3s 41ms/step - loss: 2.2551 - accuracy: 0.2313
Epoch 3/3
70/70 [==============================] - 3s 40ms/step - loss: 2.2144 - accuracy: 0.3311

注意: 1 台のマシンで複数のワーカーを実行するとオーバーヘッドが増えるため、これはこのチュートリアルの最初に実行されたテストよりも実行に時間がかかる可能性があります。ここでの目標は、トレーニング時間を改善することではなく、マルチワーカートレーニングの例を示すことです。

# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.

マルチワーカートレーニングの詳細

このチュートリアルでは、基本的なマルチワーカーのセットアップについて説明してきました。このドキュメントの残りの部分では、実際のユースケースに役立つ他の要因について詳しく見ていきます。

データセットのシャーディング

マルチワーカートレーニングでは、コンバージェンスとパフォーマンスを確保するために、データセットのシャーディングが必要です。

前のセクションの例は、tf.distribute.Strategy API により提供されるデフォルトの自動シャーディングに依存しています。tf.data.experimental.DistributeOptionstf.data.experimental.AutoShardPolicy を設定することで、シャーディングを制御できます。

自動シャーディングの詳細については、分散入力ガイドをご覧ください。

自動シャーディングをオフにして、各レプリカがすべての例を処理する方法の簡単な例を次に示します(推奨されません)。

options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF

global_batch_size = 64
multi_worker_dataset = mnist_setup.mnist_dataset(batch_size=64)
dataset_no_auto_shard = multi_worker_dataset.with_options(options)

評価する

validation_dataModel.fit に渡すと、エポックごとにトレーニングと評価が交互に行われるようになります。評価は同じセットのワーカー間で分散されているため、評価結果はすべてのワーカーが使用できるように集計されます。

トレーニングと同様に、評価データセットもファイルレベルで自動的にシャーディングされます。評価データセットにグローバルバッチサイズを設定し、validation_steps を設定する必要があります。

繰り返しのデータセットを評価することをお勧めします (tf.data.Dataset.repeat を呼び出します) 。

または、定期的にチェックポイントを読み取って評価を実行するもう 1 つのタスクを作成することもできます。これは Estimator が行うことですが、推奨される評価方法ではないため、ここでは詳細については触れません。

パフォーマンス

マルチワーカートレーニングのパフォーマンスを調整するには、次を行うことができます。

  • tf.distribute.MultiWorkerMirroredStrategy には複数の集合体通信実装が用意されています。

    • RING は、クロスホスト通信レイヤーとして、gRPC を使用したリング状の集合体を実装します。
    • NCCLNVIDIA Collective Communication Library を使用して集合体を実装します。
    • AUTO は、選択をランタイムに任せます。

    集合体の最適な実装は、GPU の数、GPU の種類、およびクラスタ内のネットワーク相互接続によって異なります。自動選択をオーバーライドするには、MultiWorkerMirroredStrategy のコンストラクタの communication_options パラメータを以下のようにして指定します。

    communication_options=tf.distribute.experimental.CommunicationOptions(implementation=tf.distribute.experimental.CommunicationImplementation.NCCL)
    
  • 可能であれば、変数を tf.float にキャストします。

    • 公式の ResNet モデルには、どのようにしてこれを行うかのが示されています。

フォールトトレランス

同期トレーニングでは、ワーカーが 1 つでも失敗し、障害復旧の仕組みが存在しない場合、クラスタは失敗します。

Keras をtf.distribute.Strategyで使用する場合、ワーカーが停止した場合や不安定である際に、フォールトトラレンスが機能するというメリットがあります。この機能は、指定された分散ファイルシステムにトレーニングの状態を保存するため、失敗、または、中断されたインスタンスを再開する場合に、トレーニングの状態が復旧されます。

ワーカーが使用できなくなると、他のワーカーはエラーを発生します (おそらくタイムアウト後)。 このような場合、使用できないワーカー、およびエラーが発生した他のワーカーを再起動する必要があります。

注意: 以前は、ModelCheckpoint コールバックには、マルチワーカートレーニングに失敗したジョブを再開したときに、トレーニングの状態を復元するメカニズムがありました。新たに導入される BackupAndRestore コールバックでは、一貫したエクスペリエンスを提供するために、シングルワーカートレーニングにもこのサポートが追加され、既存の ModelCheckpoint コールバックからフォールトトレランス機能が削除されました。今後、この動作に依存するアプリケーションは、新しい BackupAndRestore コールバックに移行する必要があります。

ModelCheckpoint コールバック

ModelCheckpointコールバックは、フォールトトレランス機能を提供しなくなりました。代わりに BackupAndRestoreコールバックを使用してください。

ModelCheckpointコールバックを使用してチェックポイントを保存することは、依然として可能です。ただし、これを使用する場合、トレーニングが中断されるか、問題なく終了した場合、チェックポイントからトレーニングを続行するには、手動でモデルを読み込まなければなりません。

オプションで、ユーザーは ModelCheckpoint コールバックの外部でモデル/重みを保存および復元することを選択できます。

モデルの保存と読み込み

model.save または tf.saved_model.save を使用してモデルを保存するには、ワーカーごとに異なる保存先が必要となります。

  • チーフワーカー以外のワーカーの場合、モデルを一時ディレクトリに保存する必要があります。
  • チーフワーカーの場合、指定されたモデルのディレクトリに保存する必要があります。

ワーカーの一時ディレクトリは、複数のワーカーが同じ場所に書き込もうとしてエラーが発生しないように、一意のディレクトリである必要があります。

すべてのディレクトリに保存されるモデルは同一のものであり、復元やサービングで参照されるのは一般的に、チーフワーカーが保存したモデルです。

トレーニングが完了したらワーカーが作成した一時ディレクトリを削除するクリーンアップロジックを用意しておく必要があります。

チーフとワーカーを同時に保存する必要があるのは、チェックポイント中に変数を集計する可能性があり、チーフとワーカーの両方が allreduce 通信プロトコルに参加する必要があるためです。しかしながら、チーフとワーカーを同じモデルディレクトリに保存すると競合が発生し、エラーとなります。

MultiWorkerMirroredStrategy を使用すると、プログラムはワーカーごとに実行され、現在のワーカーがチーフであるかを知る際には、task_typetask_id の属性があるクラスタレゾルバオブジェクトが利用されます。

  • task_type から、現在のジョブが何であるか ('worker' など) を知ることができます。
  • task_id から、ワーカーの ID を得られます。
  • task_id == 0 のワーカーはチーフワーカーです。

以下のコードスニペットの write_filepath 関数は、書き込みのファイルパスを指定します。このパスはワーカーの task_id によって異なります。

  • チーフワーカー(task_id == 0)の場合は、元のファイルパスに書き込みます。
  • それ以外のワーカーの場合は、書き込むディレクトリパスに task_id を指定して、一時ディレクトリ(temp_dir)を作成します。
model_path = '/tmp/keras-model'

def _is_chief(task_type, task_id):
  # Note: there are two possible `TF_CONFIG` configurations.
  #   1) In addition to `worker` tasks, a `chief` task type is use;
  #      in this case, this function should be modified to
  #      `return task_type == 'chief'`.
  #   2) Only `worker` task type is used; in this case, worker 0 is
  #      regarded as the chief. The implementation demonstrated here
  #      is for this case.
  # For the purpose of this Colab section, the `task_type` is `None` case
  # is added because it is effectively run with only a single worker.
  return (task_type == 'worker' and task_id == 0) or task_type is None

def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)
write_model_path = write_filepath(model_path, task_type, task_id)

これで、保存の準備ができました。

multi_worker_model.save(write_model_path)
WARNING:absl:Found untraced functions such as _jit_compiled_convolution_op, _update_step_xla while saving (showing 2 of 2). These functions will not be directly callable after loading.
INFO:tensorflow:Assets written to: /tmp/keras-model/assets
INFO:tensorflow:Assets written to: /tmp/keras-model/assets

前述したように、後でモデルを読み込む場合、チーフが保存した場所にあるモデルのみを使用するべきなので、非チーフワーカーが保存した一時的なモデルは削除します。

if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(os.path.dirname(write_model_path))

読み込む際に便利な tf.keras.models.load_model API を使用して、以降の作業に続けることにします。

ここでは、シングルワーカーのみを使用してトレーニングを読み込んで続けると仮定します。この場合、別の strategy.scope() 内で tf.keras.models.load_model を呼び出しません (前に定義したように、strategy = tf.distribute.MultiWorkerMirroredStrategy() です)。

loaded_model = tf.keras.models.load_model(model_path)

# Now that the model is restored, and can continue with the training.
loaded_model.fit(single_worker_dataset, epochs=2, steps_per_epoch=20)
Epoch 1/2
20/20 [==============================] - 1s 7ms/step - loss: 2.2914 - accuracy: 0.2000
Epoch 2/2
20/20 [==============================] - 0s 7ms/step - loss: 2.2754 - accuracy: 0.2422
<keras.callbacks.History at 0x7fc2c09f9670>

チェックポイントの保存と復元

一方、チェックポイントを作成すれば、モデルの重みを保存し、モデル全体を保存せずともそれらを復元することが可能です。

ここでは、モデルをトラッキングする tf.train.Checkpoint を 1 つ作成します。これは tf.train.CheckpointManager によって管理されるため、最新のチェックポイントのみが保存されます。

checkpoint_dir = '/tmp/ckpt'

checkpoint = tf.train.Checkpoint(model=multi_worker_model)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id)
checkpoint_manager = tf.train.CheckpointManager(
    checkpoint, directory=write_checkpoint_dir, max_to_keep=1)

CheckpointManager の準備ができたら、チェックポイントを保存し、チーフ以外のワーカーが保存したチェックポイントを削除します。

checkpoint_manager.save()
if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(write_checkpoint_dir)

これで、復元する必要があれば、便利なtf.train.latest_checkpoint関数を使用して、保存された最新のチェックポイントを見つけることができるようになりました。チェックポイントが復元されると、トレーニングを続行することができます。

latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
checkpoint.restore(latest_checkpoint)
multi_worker_model.fit(multi_worker_dataset, epochs=2, steps_per_epoch=20)
2022-12-14 20:01:05.395997: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:784] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:5"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
attr {
  key: "replicate_on_split"
  value {
    b: false
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}
Epoch 1/2
2022-12-14 20:01:05.654507: W tensorflow/core/framework/dataset.cc:807] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
20/20 [==============================] - 3s 7ms/step - loss: 2.2941 - accuracy: 0.2281
Epoch 2/2
20/20 [==============================] - 0s 7ms/step - loss: 2.2783 - accuracy: 0.2648
<keras.callbacks.History at 0x7fc2c0a6c880>

BackupAndRestore コールバック

tf.keras.callbacks.BackupAndRestore コールバックはフォールトトレランス機能を提供します。この機能はモデルと現在のエポック番号を一時チェックポイントファイルに backup_dir 引数でバックアップし、BackupAndRestore でコールバックします。

注意: Tensorflow 2.9 では、現在のモデルとトレーニング状態がエポック境界でバックアップされます。 tf-nightly バージョンおよび TensorFlow 2.10 以降では、BackupAndRestore コールバックはエポックまたはステップ境界でモデルとトレーニング状態をバックアップします。BackupAndRestore は、オプションの save_freq 引数を受け入れます。save_freq は、'epoch' または int 値のいずれかを受け入れます。save_freq'epoch' に設定されている場合、モデルはエポックごとにバックアップされます。save_freq0 より大きい整数値に設定されている場合、モデルは save_freq バッチごとにバックアップされます。

ジョブが中断されて再開されると、BackupAndRestore コールバックが最後のチェックポイントを復元し、トレーニング状態が最後に保存されたエポックとステップの最初からトレーニングを続行できます。

これを使用するには、Model.fit 呼び出し時に、 Model.fit のインスタンスを指定します。

MultiWorkerMirroredStrategy では、ワーカーが中断されると、そのワーカーが再開するまでクラスタ全体が一時停止されます。そのワーカーが再開すると他のワーカーも再開します。中断したワーカーがクラスタに参加し直すと、各ワーカーは以前に保存されたチェックポイントファイルを読み取って以前の状態を復元するため、クラスタの同期状態が戻ります。そして、トレーニングが続行されます。分散データセットの反復子の状態は再初期化され、復元されません。

The BackupAndRestore callback uses the CheckpointManager to save and restore the training state, which generates a file called checkpoint that tracks existing checkpoints together with the latest one. For this reason, backup_dir should not be re-used to store other checkpoints in order to avoid name collision.

現在、BackupAndRestore コールバックは、ストラテジーなしのシングルワーカートレーニング(MirroredStrategy)と MultiWorkerMirroredStrategy によるマルチワーカートレーニングをサポートしています。

Below are two examples for both multi-worker training and single-worker training:

# Multi-worker training with `MultiWorkerMirroredStrategy`
# and the `BackupAndRestore` callback. The training state 
# is backed up at epoch boundaries by default.

callbacks = [tf.keras.callbacks.BackupAndRestore(backup_dir='/tmp/backup')]
with strategy.scope():
  multi_worker_model = mnist_setup.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
                       epochs=3,
                       steps_per_epoch=70,
                       callbacks=callbacks)
2022-12-14 20:01:08.689450: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:784] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:5"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
attr {
  key: "replicate_on_split"
  value {
    b: false
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}
Epoch 1/3
70/70 [==============================] - 3s 8ms/step - loss: 2.2696 - accuracy: 0.2268
Epoch 2/3
70/70 [==============================] - 1s 8ms/step - loss: 2.2096 - accuracy: 0.3013
Epoch 3/3
70/70 [==============================] - 1s 8ms/step - loss: 2.1317 - accuracy: 0.4317
<keras.callbacks.History at 0x7fc2be65ddc0>

BackupAndRestore コールバックの save_freq 引数が 'epoch' に設定されている場合、モデルはエポックごとにバックアップされます。

# The training state is backed up at epoch boundaries because `save_freq` is
# set to `epoch`.

callbacks = [tf.keras.callbacks.BackupAndRestore(backup_dir='/tmp/backup')]
with strategy.scope():
  multi_worker_model = mnist_setup.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
                       epochs=3,
                       steps_per_epoch=70,
                       callbacks=callbacks)
2022-12-14 20:01:13.452703: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:784] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:5"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
attr {
  key: "replicate_on_split"
  value {
    b: false
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}
Epoch 1/3
70/70 [==============================] - 3s 8ms/step - loss: 2.2786 - accuracy: 0.1719
Epoch 2/3
70/70 [==============================] - 1s 8ms/step - loss: 2.2316 - accuracy: 0.3955
Epoch 3/3
70/70 [==============================] - 1s 8ms/step - loss: 2.1827 - accuracy: 0.5406
<keras.callbacks.History at 0x7fc2c0b98f70>

注意: 次のコード ブロックでは、Tensorflow 2.10 がリリースされるまで tf-nightly でのみ利用可能な機能を使用します。

BackupAndRestore コールバックの save_freq 引数が 0 より大きい整数値に設定されている場合、モデルは save_freq バッチごとにバックアップされます。

# The training state is backed up at every 30 steps because `save_freq` is set
# to an integer value of `30`.

callbacks = [tf.keras.callbacks.BackupAndRestore(backup_dir='/tmp/backup', save_freq=30)]
with strategy.scope():
  multi_worker_model = mnist_setup.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
                       epochs=3,
                       steps_per_epoch=70,
                       callbacks=callbacks)
2022-12-14 20:01:18.217652: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:784] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:5"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
attr {
  key: "replicate_on_split"
  value {
    b: false
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}
Epoch 1/3
70/70 [==============================] - 3s 9ms/step - loss: 2.2709 - accuracy: 0.1958
Epoch 2/3
70/70 [==============================] - 1s 9ms/step - loss: 2.1935 - accuracy: 0.4922
Epoch 3/3
70/70 [==============================] - 1s 10ms/step - loss: 2.1001 - accuracy: 0.6498
<keras.callbacks.History at 0x7fc2be5221c0>

BackupAndRestore に指定した backup_dir のディレクトリを検査すると、一時的に生成されたチェックポイントファイルがいくつかあることに気づくでしょう。これらのファイルは、以前に失われたインスタンスの復元に必要なもので、トレーニングが正常に終了した時点で、Model.fit の最後にライブラリによって削除されます。

注意: 現在、BackupAndRestore コールバックは eager モードのみをサポートしています。グラフ モードでは、モデルの保存と復元に Model.save/tf.saved_model.savetf.keras.models.load_model を使用することを検討してください。それぞれ、上記のモデルの保存と読み込みセクションで説明されています。トレーニング中に Model.fitinitial_epoch を提供します。

追加リソース

  1. TensorFlow での分散型トレーニングガイドでは、利用可能な分散ストラテジーの概要が説明されています。
  2. Keras によるカスタムトレーニングループと MultiWorkerMirroredStrategy のチュートリアルでは、Keras とカスタムトレーニングループでMultiWorkerMirroredStrategy を使用する方法が説明されています。
  3. 公式モデルをご覧ください。この多くは、複数の分散ストラテジーを実行するように構成できます。
  4. tf.function を使ったパフォーマンスの改善ガイドでは、その他のストラテジーや、TensorFlow モデルのパフォーマンスを最適化するために使用できる TensorFlow Profiler といったツールに関する情報が提供されています。