このページは Cloud Translation API によって翻訳されました。
Switch to English

Kerasを使用したマルチワーカートレーニング

TensorFlow.orgで見る Google Colabで実行 GitHubでソースを表示する ノートブックをダウンロード

概観

このチュートリアルでは、 tf.distribute.Strategy API、具体的にはtf.distribute.experimental.MultiWorkerMirroredStrategyを使用したtf.distribute.Strategyモデルによるマルチワーカー分散トレーニングをtf.distribute.Strategyます。この戦略の助けを借りて、シングルワーカーで実行するように設計されたKerasモデルは、最小限のコード変更で複数のワーカーでシームレスに作業できます。

TensorFlowでの分散トレーニングガイドは、 tf.distribute.Strategy APIをより深く理解しtf.distribute.Strategy人のためにTensorFlowがサポートする分散戦略の概要について利用できます。

セットアップ

まず、TensorFlowと必要なインポートをセットアップします。

 import os
import tensorflow as tf
import numpy as np
 

データセットを準備しています

それでは、MNISTデータセットを準備しましょう。 MNISTデータセットは、手書きの数字0〜9の60,000のトレーニング例と10,000のテスト例で構成され、28x28ピクセルのモノクロ画像としてフォーマットされています。この例では、データセットのトレーニング部分を取り上げて説明します。

 def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the range [0, 255].
  # We need to convert them to float32 with values in the range [0, 1]
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
  return train_dataset
 

Kerasモデルを構築する

ここでは、 tf.keras.Sequential APIを使用して、単純な畳み込みニューラルネットワークのKerasモデルを構築およびコンパイルし、MNISTデータセットでトレーニングします。

 def build_and_compile_cnn_model():
  model = tf.keras.Sequential([
      tf.keras.Input(shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
      metrics=['accuracy'])
  return model
 

まず、少数のエポックについてモデルをトレーニングし、単一のワーカーで結果を観察して、すべてが正しく機能することを確認しましょう。エポックが進むにつれ、損失が減少し、精度が1.0に近づくと期待する必要があります。

 per_worker_batch_size = 64
single_worker_dataset = mnist_dataset(per_worker_batch_size)
single_worker_model = build_and_compile_cnn_model()
single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)
 
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
11493376/11490434 [==============================] - 0s 0us/step
Epoch 1/3
70/70 [==============================] - 0s 2ms/step - loss: 2.2701 - accuracy: 0.2451
Epoch 2/3
70/70 [==============================] - 0s 2ms/step - loss: 2.1827 - accuracy: 0.4777
Epoch 3/3
70/70 [==============================] - 0s 2ms/step - loss: 2.0865 - accuracy: 0.5955

<tensorflow.python.keras.callbacks.History at 0x7fc59381ac50>

マルチワーカー構成

次に、マルチワーカートレーニングの世界に入りましょう。 TensorFlowでは、 TF_CONFIG環境変数が複数のマシンでのトレーニングに必要であり、それぞれが異なる役割を持つ可能性があります。 TF_CONFIGは、クラスターの一部である各ワーカーのクラスター構成を指定するために使用されるJSON文字列です。

TF_CONFIG clustertask 2つのコンポーネントがありtaskclusterは、 workerなどのさまざまなタイプのジョブで構成されるdictであるトレーニングクラスターに関する情報を提供します。マルチ労働者の訓練ではMultiWorkerMirroredStrategy 、通常、1つがありworkerのチェックポイントを保存し、定期的なものに加えて、TensorBoardの集計ファイルの書き込みのようにもう少し責任を取るworker行います。このような労働者は、次のように呼ばれているchief労働者、そしてあることが通例であるworkerindex 0がチーフに任命されたworker (実際には、これはどのようにあるtf.distribute.Strategy実装されています)。 task一方では、現在のタスクの情報を提供します。最初のコンポーネントclusterはすべてのワーカーで同じであり、2番目のコンポーネントtaskは各ワーカーで異なり、そのワーカーのtypeindexを指定します。

この例では、タスクtype"worker"し、タスクindex0ます。これは、そのような設定を持つマシンが最初の作業者であり、主任作業員に任命され、他の作業者よりも多くの作業を行うことを意味します。他のマシンにもTF_CONFIG環境変数を設定する必要があり、同じcluster TF_CONFIGを持っている必要がありclusterが、それらのマシンの役割に応じてタスクtypeまたはタスクindexが異なります。

説明のために、このチュートリアルでは、 localhostで2つのワーカーをTF_CONFIGしてTF_CONFIGを設定する方法を示します。実際には、ユーザーは外部IPアドレス/ポートに複数のワーカーを作成し、各ワーカーにTF_CONFIGを適切に設定します。

 os.environ['TF_CONFIG'] = json.dumps({
    'cluster': {
        'worker': ["localhost:12345", "localhost:23456"]
    },
    'task': {'type': 'worker', 'index': 0}
})
 

この例では学習率が固定されていますが、一般に、グローバルバッチサイズに基づいて学習率を調整する必要がある場合があります。

適切な戦略を選択する

TensorFlowでは、分散トレーニングは、トレーニングのステップがワーカーとレプリカ間で同期される同期トレーニングと、トレーニングステップが厳密に同期されない非同期トレーニングで構成されます。

このガイドでは、マルチワーカー同期トレーニングの推奨戦略であるMultiWorkerMirroredStrategyについて説明します。モデルをトレーニングするには、 tf.distribute.experimental.MultiWorkerMirroredStrategyインスタンスを使用します。 MultiWorkerMirroredStrategyは、すべてのワーカーの各デバイスのモデルのレイヤーにあるすべての変数のコピーを作成します。集団通信のためのTensorFlow演算であるCollectiveOps使用して、勾配を集約し、変数の同期を維持します。 tf.distribute.Strategyガイドには、この戦略の詳細が記載されています。

 strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
 
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/device:GPU:0',)
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CollectiveCommunication.AUTO

MultiWorkerMirroredStrategyは、 CollectiveCommunicationパラメーターを介して複数の実装を提供します。 RINGは、クロスホスト通信レイヤーとしてgRPCを使用してリングベースの集合を実装します。 NCCLNvidiaのNCCLを使用して集合を実装します。 AUTOは選択をランタイムに委ねます。集合的な実装の最良の選択は、GPUの数と種類、およびクラスター内のネットワーク相互接続に依存します。

MultiWorkerMirroredStrategyでモデルをトレーニングする

統合によりtf.distribute.StrategyにAPI tf.keras 、唯一のあなたは、マルチ労働者がモデル構築と囲むさにトレーニングを配布するようになります変更model.compile()内部コールstrategy.scope()分散戦略のスコープは、変数が作成される方法と場所を決定しますMultiWorkerMirroredStrategyの場合、作成される変数はMirroredVariableであり、各ワーカーで複製されます。

 num_workers = 4

# Here the batch size scales up by number of workers since 
# `tf.data.Dataset.batch` expects the global batch size. Previously we used 64, 
# and now this becomes 128.
global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist_dataset(global_batch_size)

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = build_and_compile_cnn_model()

# Keras' `model.fit()` trains the model with specified number of epochs and
# number of steps per epoch. Note that the numbers here are for demonstration
# purposes only and may not sufficiently produce a model with good quality.
multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)
 
Epoch 1/3
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/data/ops/multi_device_iterator_ops.py:601: get_next_as_optional (from tensorflow.python.data.ops.iterator_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.data.Iterator.get_next_as_optional()` instead.
70/70 [==============================] - 0s 3ms/step - loss: 2.2682 - accuracy: 0.2265
Epoch 2/3
70/70 [==============================] - 0s 3ms/step - loss: 2.1714 - accuracy: 0.4954
Epoch 3/3
70/70 [==============================] - 0s 3ms/step - loss: 2.0638 - accuracy: 0.6232

<tensorflow.python.keras.callbacks.History at 0x7fc5f4f062e8>

データセットのシャーディングとバッチサイズ

MultiWorkerMirroredStrategyマルチワーカートレーニングでは、収束とパフォーマンスを確保するためにデータセットのシャーディングが必要です。ただし、上記のコードスニペットでは、データセットはmodel.fit()する必要なく直接model.fit()渡されることに注意してください。これは、 tf.distribute.Strategy APIがデータセットのシャーディングを自動的に処理するためです。ファイルレベルでデータセットを分割し、歪んだ断片を作成する場合があります。ファイルが1つしかない極端な場合、最初のシャード(つまりワーカー)だけがトレーニングまたは評価データを取得し、結果としてすべてのワーカーがエラーを受け取ります。

トレーニングに手動シャーディングが必要な場合は、 tf.data.experimental.DistributeOptions apiをtf.data.experimental.DistributeOptionsして自動シャーディングをオフにできます。具体的には、

 options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF
dataset_no_auto_shard = multi_worker_dataset.with_options(options)
 

もう1つ注意すべき点は、 datasetsバッチサイズです。コードスニペットに、上記我々が使用global_batch_size = per_worker_batch_size * num_workersある、 num_workersそれは単一作業者であった場合に限り大きく回、有効当たりワーカーバッチサイズは、グローバルバッチサイズであるため、(パラメータが渡されたtf.data.Dataset.batch() )をワーカー数で割った値です。この変更により、ワーカーあたりのバッチサイズを以前と同じに保ちます。

評価

validation_datamodel.fit 、エポックごとにトレーニングと評価が交互に行われます。 validation_dataを取得validation_data評価は、同じワーカーセットに分散され、評価結果が集計され、すべてのワーカーが利用できます。トレーニングと同様に、検証データセットはファイルレベルで自動的に分割されます。あなたは、検証データセットとセットにグローバルなバッチサイズを設定する必要がありますvalidation_steps 。繰り返しデータセットも評価に推奨されます。

または、チェックポイントを定期的に読み取り、評価を実行する別のタスクを作成することもできます。これがEstimatorが行うことです。ただし、これは評価を実行するための推奨される方法ではないため、詳細は省略されます。

予測

現在、 model.predictmodel.predictでは機能しませんMultiWorkerMirroredStrategy.

パフォーマンス

これで、 MultiWorkerMirroredStrategyして複数のワーカーで実行するように設定されたMultiWorkerMirroredStrategyモデルが作成されました。 MultiWorkerMirroredStrategyを使用してマルチワーカートレーニングのパフォーマンスを調整するには、次のテクニックを試すことができます。

  • MultiWorkerMirroredStrategyは、複数の集団通信の実装を提供します。 RINGは、gRPCをクロスホスト通信レイヤーとして使用して、リングベースの集合を実装します。 NCCLNvidiaのNCCLを使用して集合を実装します。 AUTOは選択をランタイムに委ねます。集合実装の最良の選択は、GPUの数と種類、およびクラスター内のネットワーク相互接続によって異なります。自動選択をオーバーライドするには、 MultiWorkerMirroredStrategyのコンストラクターのcommunicationパラメーターに有効な値を指定します(例: communication=tf.distribute.experimental.CollectiveCommunication.NCCL
  • tf.float 、変数をtf.floatキャストします。公式のResNetモデルにはこれを行う方法のが含まれています。

耐障害性

同期トレーニングでは、ワーカーの1つが失敗し、障害回復メカニズムが存在しない場合、クラスターは失敗します。 tf.distribute.Strategytf.distribute.Strategy使用すると、ワーカーが死亡したり不安定になったりした場合のフォールトトレランスの利点が得られます。これは、選択した分散ファイルシステムでトレーニング状態を保持することによって行われます。これにより、以前に失敗または横取りされたインスタンスの再起動時に、トレーニング状態が回復されます。

すべてのワーカーはトレーニングエポックとステップに関して同期が保たれているため、他のワーカーは、失敗した、または横取りされたワーカーが再起動して再開するのを待つ必要があります。

ModelCheckpointコールバック

ModelCheckpointコールバックはフォールトトレランス機能を提供しなくなりました。代わりにBackupAndRestoreコールバックを使用してください。

ModelCheckpointコールバックは、チェックポイントを保存するために引き続き使用できます。ただし、これにより、トレーニングが中断された、または正常に終了した場合、チェックポイントからトレーニングを続行するには、モデルを手動でロードする必要があります。オプションで、ユーザーはModelCheckpointコールバックの外でモデル/重みを保存および復元することを選択できます。

モデルの保存と読み込み

model.saveまたはtf.saved_model.saveを使用してモデルを保存するには、保存先をワーカーごとに変える必要があります。チーフ以外のワーカーでは、モデルを一時ディレクトリに保存し、チーフでは、提供されたモデルディレクトリに保存する必要があります。複数のワーカーが同じ場所に書き込もうとすることによって発生するエラーを防ぐために、ワーカーの一時ディレクトリは一意である必要があります。すべてのディレクトリに保存されたモデルは同一であり、通常、チーフによって保存されたモデルのみを復元または提供するために参照する必要があります。トレーニングが完了すると、ワーカーによって作成された一時ディレクトリを削除するクリーンアップロジックを用意することをお勧めします。

チーフとワーカーを同時に保存する必要があるのは、チェックポイント中に変数を集約する可能性があるためです。これにより、チーフとワーカーの両方がallreduce通信プロトコルに参加する必要があります。一方、チーフとワーカーを同じモデルディレクトリに保存すると、競合が原因でエラーが発生します。

MultiWorkerMirroredStrategy 、プログラムはすべての労働者に実行され、現在の労働者がチーフであるかどうかを知るために、我々は属性があり、クラスタのリゾルバオブジェクトを利用task_typetask_idtask_typeは現在のジョブが何であるか(たとえば、「worker」)を示し、 task_idはワーカーの識別子を示します。 ID 0のワーカーは、チーフワーカーとして指定されています。

以下のコードスニペットでは、 write_filepathは、ワーカーIDに応じて、書き込むファイルパスを提供します。チーフ(ID 0のワーカー)の場合、元のファイルパスに書き込みます。その他の場合は、書き込むための一時ディレクトリ(ディレクトリパスにidを含む)を作成します。

 model_path = '/tmp/keras-model'

def _is_chief(task_type, task_id):
  # If `task_type` is None, this may be operating as single worker, which works 
  # effectively as chief.
  return task_type is None or task_type == 'chief' or (
            task_type == 'worker' and task_id == 0)

def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)
write_model_path = write_filepath(model_path, task_type, task_id)
 

これで、保存する準備が整いました。

 multi_worker_model.save(write_model_path)
 
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/training/tracking/tracking.py:111: Model.state_updates (from tensorflow.python.keras.engine.training) is deprecated and will be removed in a future version.
Instructions for updating:
This property should not be used in TensorFlow 2.0, as updates are applied automatically.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/training/tracking/tracking.py:111: Layer.updates (from tensorflow.python.keras.engine.base_layer) is deprecated and will be removed in a future version.
Instructions for updating:
This property should not be used in TensorFlow 2.0, as updates are applied automatically.
INFO:tensorflow:Assets written to: /tmp/keras-model/assets

上記で説明したように、後でモデルは保存されたパスのチーフからのみロードする必要があるため、チーフ以外のワーカーが保存した一時的なものを削除してみましょう。

 if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(os.path.dirname(write_model_path))
 

さて、ロードする時がtf.keras.models.load_model 、便利なtf.keras.models.load_model APIを使用して、さらに作業を続けましょう。ここでは、トレーニングをロードして続行するために単一のワーカーのみを使用することを想定しています。この場合、別のstrategy.scope()内でtf.keras.models.load_model呼び出しません。

 loaded_model = tf.keras.models.load_model(model_path)

# Now that we have the model restored, and can continue with the training.
loaded_model.fit(single_worker_dataset, epochs=2, steps_per_epoch=20)
 
Epoch 1/2
20/20 [==============================] - 0s 2ms/step - loss: 1.9825 - accuracy: 0.1102
Epoch 2/2
20/20 [==============================] - 0s 2ms/step - loss: 1.9367 - accuracy: 0.1117

<tensorflow.python.keras.callbacks.History at 0x7fc5f4b0d8d0>

チェックポイントの保存と復元

一方、チェックポイント機能を使用すると、モデル全体を保存しなくても、モデルの重みを保存して復元できます。ここでは、モデルを追跡する1つのtf.train.Checkpointを作成します。これは、 tf.train.CheckpointManagerによって管理されるため、最新のチェックポイントのみが保持されます。

 checkpoint_dir = '/tmp/ckpt'

checkpoint = tf.train.Checkpoint(model=multi_worker_model)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id)
checkpoint_manager = tf.train.CheckpointManager(
  checkpoint, directory=write_checkpoint_dir, max_to_keep=1)
 

CheckpointManagerを設定したら、保存して、チーフ以外のワーカーが保存したチェックポイントを削除する準備が整います。

 checkpoint_manager.save()
if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(write_checkpoint_dir)
 

これで、復元が必要になったときに、便利なtf.train.latest_checkpoint関数を使用して保存された最新のチェックポイントを見つけることができます。チェックポイントを復元した後、トレーニングを続行できます。

 latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
checkpoint.restore(latest_checkpoint)
multi_worker_model.fit(multi_worker_dataset, epochs=2, steps_per_epoch=20)
 
Epoch 1/2
20/20 [==============================] - 0s 3ms/step - loss: 1.9841 - accuracy: 0.6561
Epoch 2/2
20/20 [==============================] - 0s 3ms/step - loss: 1.9445 - accuracy: 0.6805

<tensorflow.python.keras.callbacks.History at 0x7fc5f49d9d30>

BackupAndRestoreコールバック

BackupAndRestoreコールバックは、下の一時的なチェックポイント・ファイル内のモデルと現在のエポック番号をバックアップすることで、フォールトトレランス機能を提供しbackup_dirの引数BackupAndRestore 。これは、各エポックの終わりに行われます。

ジョブが中断されて再起動すると、コールバックは最後のチェックポイントを復元し、中断されたエポックの最初からトレーニングが続行されます。中断前の未完成のエポックで既に行われた部分的なトレーニングはすべて破棄されるため、最終的なモデルの状態には影響しません。

これを使用するには、 tf.keras.Model.fit()呼び出しでtf.keras.callbacks.experimental.BackupAndRestoreインスタンスを提供します。

MultiWorkerMirroredStrategyでは、ワーカーが中断されると、中断されたワーカーが再起動されるまでクラスター全体が一時停止します。他のワーカーも再起動し、中断されたワーカーがクラスターに再度参加します。次に、すべてのワーカーが以前に保存されたチェックポイントファイルを読み取り、以前の状態を取得して、クラスターの同期を取り戻します。その後、トレーニングが続行されます。

BackupAndRestoreコールバックは、 CheckpointManagerを使用してトレーニング状態を保存および復元します。これにより、最新のチェックポイントと一緒に既存のチェックポイントを追跡するチェックポイントと呼ばれるファイルが生成されます。このため、名前の衝突を回避するために、 backup_dirを他のチェックポイントの保存に再利用しないでください。

現在、 BackupAndRestoreコールバックは、戦略のない単一のワーカー、MirroredStrategy、およびMultiWorkerMirroredStrategyのマルチワーカーをサポートしています。以下は、マルチワーカートレーニングとシングルワーカートレーニングの2つの例です。

 # Multi-worker training with MultiWorkerMirroredStrategy.

callbacks = [tf.keras.callbacks.experimental.BackupAndRestore(backup_dir='/tmp/backup')]
with strategy.scope():
  multi_worker_model = build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
                       epochs=3,
                       steps_per_epoch=70,
                       callbacks=callbacks)
 
Epoch 1/3
70/70 [==============================] - 0s 3ms/step - loss: 2.2837 - accuracy: 0.1836
Epoch 2/3
70/70 [==============================] - 0s 3ms/step - loss: 2.2131 - accuracy: 0.4091
Epoch 3/3
70/70 [==============================] - 0s 3ms/step - loss: 2.1310 - accuracy: 0.5485

<tensorflow.python.keras.callbacks.History at 0x7fc5f49a3080>

あなたがのディレクトリ調べる場合はbackup_dirあなたが指定したBackupAndRestore 、あなたには、いくつかの一時的に生成チェックポイントファイルに気づくことがあります。これらのファイルは、以前に失われたインスタンスを回復するために必要であり、トレーニングが正常に終了すると、 tf.keras.Model.fit()最後にライブラリによって削除されます。

こちらもご覧ください

  1. TensorFlowでの分散トレーニングガイドは、利用可能な分散戦略の概要を提供します。
  2. 公式モデル 。その多くは、複数の配布戦略を実行するように構成できます。
  3. このガイドのパフォーマンスセクションでは、TensorFlowモデルのパフォーマンスを最適化するために使用できる他の戦略とツールに関する情報を提供しています。