Kerasによるマルチワーカートレーニング

TensorFlow.orgで表示 GoogleColabで実行 GitHubでソースを表示 ノートブックをダウンロード

概要

このチュートリアルでは、Kerasモデルとしたマルチ労働者の分散訓練を実行する方法を示しModel.fit使用API tf.distribute.Strategy API-特にtf.distribute.MultiWorkerMirroredStrategyクラスを。この戦略の助けを借りて、単一のワーカーで実行するように設計されたKerasモデルは、最小限のコード変更で複数のワーカーでシームレスに作業できます。

のより深い理解に興味のある人のためにtf.distribute.StrategyのAPI、 TensorFlowの中に分散トレーニングガイドでは、流通戦略TensorFlowサポートの概要については、利用可能です。

使用方法についてはMultiWorkerMirroredStrategy Kerasおよびカスタムトレーニングループと、参照KerasとMultiWorkerMirroredStrategyとカスタムトレーニングループ

このチュートリアルの目的は、2人のワーカーを使用した最小限のマルチワーカーの例を示すことであることに注意してください。

設定

いくつかの必要なインポートから始めます。

import json
import os
import sys

TensorFlowをインポートする前に、環境にいくつかの変更を加えます。

  1. すべてのGPUを無効にします。これにより、すべてのワーカーが同じGPUを使用しようとすることによって発生するエラーが防止されます。実際のアプリケーションでは、各ワーカーは異なるマシン上にあります。
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
  1. リセットTF_CONFIG (後でこれについて詳しく説明します)環境変数を:
os.environ.pop('TF_CONFIG', None)
  1. 現在のディレクトリがPythonのパス-この上にあることを確認しますと、ノートブックは、によって書かれたファイルをインポートすることができます%%writefile以降:
if '.' not in sys.path:
  sys.path.insert(0, '.')

TensorFlowをインポートします。

import tensorflow as tf

データセットとモデルの定義

次に、作成mnist.py単純なモデルとデータセットの設定でファイルを。このPythonファイルは、このチュートリアルのワーカープロセスによって使用されます。

%%writefile mnist.py

import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the [0, 255] range.
  # You need to convert them to float32 with values in the [0, 1] range.
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
  return train_dataset

def build_and_compile_cnn_model():
  model = tf.keras.Sequential([
      tf.keras.layers.InputLayer(input_shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
      metrics=['accuracy'])
  return model
Writing mnist.py

一人の労働者のモデルトレーニング

エポックの数が少ないためにモデルを訓練してみて、すべてが正常に動作することを確認するために、単一の労働者の結果を観察します。トレーニングが進むにつれて、損失は減少し、精度は向上するはずです。

import mnist

batch_size = 64
single_worker_dataset = mnist.mnist_dataset(batch_size)
single_worker_model = mnist.build_and_compile_cnn_model()
single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
11493376/11490434 [==============================] - 0s 0us/step
11501568/11490434 [==============================] - 0s 0us/step
2021-08-20 01:21:51.478839: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-08-20 01:21:51.478914: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:21:51.478928: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:21:51.479029: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 470.57.2
2021-08-20 01:21:51.479060: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 470.57.2
2021-08-20 01:21:51.479067: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 470.57.2
2021-08-20 01:21:51.480364: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
Epoch 1/3
 1/70 [..............................] - ETA: 26s - loss: 2.3067 - accuracy: 0.0469
2021-08-20 01:21:52.316481: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:185] None of the MLIR Optimization Passes are enabled (registered 2)
70/70 [==============================] - 1s 12ms/step - loss: 2.2829 - accuracy: 0.1667
Epoch 2/3
70/70 [==============================] - 1s 12ms/step - loss: 2.2281 - accuracy: 0.3842
Epoch 3/3
70/70 [==============================] - 1s 12ms/step - loss: 2.1625 - accuracy: 0.5348
<keras.callbacks.History at 0x7f633d957390>

マルチワーカー構成

それでは、マルチワーカートレーニングの世界に入りましょう。

ジョブとタスクを含むクラスター

:TensorFlowでは、分散型訓練が必要'cluster'複数のジョブとを、およびジョブのそれぞれは、一つ以上持って'task'秒。

あなたは必要になりますTF_CONFIG可能性が異なる役割をそれぞれ有する、複数のマシン上でトレーニングするための設定環境変数を。 TF_CONFIGクラスタの一部である各ワーカーのクラスタ構成を指定するために使用されるJSON文字列です。

二つの成分がありますTF_CONFIG :変数'cluster''task'

  • A 'cluster' 、すべての労働者のための同じであるとのようなジョブの異なるタイプからなる辞書で訓練クラスタに関する情報提供'worker''chief'

    • マルチ労働者の訓練ではtf.distribute.MultiWorkerMirroredStrategy 、通常、1つあり'worker'など、通常のものに加えて、チェックポイントを保存し、TensorBoardのサマリファイルを書くよう、責任を取る'worker'ありません。このような'worker' (ジョブ名を持つチーフ労働者と呼ばれている'chief' )。
    • のが通例である'chief'持っている'index' 0 (実際には、これはどのようにあるに任命さtf.distribute.Strategy実装されています)。
  • 'task'現在のタスクの情報を提供し、各ワーカーのために異なっています。これは、指定し'type''index'という労働者のを。

以下は構成例です。

tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

ここでは同じであるTF_CONFIG JSON文字列としてシリアル化は:

json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'

注意tf_config Pythonでちょうどローカル変数です。トレーニングの設定のためにそれを使用できるようにするには、この辞書は、JSONとしてシリアライズし、中に配置する必要がTF_CONFIG環境変数。

上記の設定例では、タスクセット'type''worker'とタスク'index'0 。したがって、このマシンは最初に労働者です。それはとして任命される'chief'労働者や他の人よりも多くの作業を行います。

説明のために、このチュートリアルのショーは、あなたが設定できるかTF_CONFIG上の2人の労働者と変数をlocalhost

実際には、外部のIPアドレス/ポート上で複数の労働者を作成して設定しますTF_CONFIGに応じて、各作業者の変数を。

このチュートリアルでは、2つのワーカーを使用します。

  • 最初の( 'chief' )労働者のTF_CONFIG上に表示されます。
  • 二労働者のために、あなたが設定されますtf_config['task']['index']=1

ノートブックの環境変数とサブプロセス

サブプロセスは、親から環境変数を継承します。

たとえば、このJupyterNotebookプロセスで環境変数を次のように設定できます。

os.environ['GREETINGS'] = 'Hello TensorFlow!'

次に、サブプロセスから環境変数にアクセスできます。

echo ${GREETINGS}
Hello TensorFlow!

次のセクションでは、合格するために同様の方法を使用しますTF_CONFIGワーカーサブプロセスに。実際のシナリオでは、この方法でジョブを起動することはありませんが、この例ではそれで十分です。

適切な戦略を選択する

TensorFlowには、分散トレーニングの2つの主要な形式があります。

  • トレーニングのステップは、労働者とレプリカ間で同期される同期トレーニング、および
  • トレーニングステップは厳密に同期されません非同期トレーニング、(例えば、パラメータサーバのトレーニング)。

このチュートリアルでは、インスタンス用いた同期マルチ労働者の訓練を実行する方法を示しtf.distribute.MultiWorkerMirroredStrategy

MultiWorkerMirroredStrategyすべての労働者全体で、各デバイス上でモデルのレイヤー内のすべての変数のコピーを作成します。これは、使用していますCollectiveOps集約グラデーションに、集団通信のためのTensorFlowオペアンプ、および同期変数を保持します。 tf.distribute.Strategyガイドは、この戦略の詳細を持っています。

strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO

MultiWorkerMirroredStrategy介して複数の実装を提供CommunicationOptionsパラメータ:1) RINGクロスホスト通信層としてgRPCを使用して実装環系集団を、 2) NCCL使用していますNVIDIA集合通信ライブラリを集団を実装します。そして3) AUTO 、実行時に選択を延期します。集合的な実装の最適な選択は、GPUの数と種類、およびクラスター内のネットワーク相互接続によって異なります。

モデルをトレーニングする

統合によりtf.distribute.StrategyにAPI tf.kerasのみ変更するには、複数の研究者にトレーニングを配布するようになりますモデル構築と囲むさmodel.compile()内部コールstrategy.scope() 。流通戦略のスコープのおもむく方法と場所の変数が作成され、かつの場合にMultiWorkerMirroredStrategy 、作成した変数はMirroredVariable秒、そしてそれらは、労働者のそれぞれに複製されます。

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist.build_and_compile_cnn_model()

実際に実行するにはMultiWorkerMirroredStrategyあなたはワーカープロセスを実行し、合格する必要がありますTF_CONFIG彼らに。

同様mnist.py先に書かれたファイル、ここにあるmain.py労働者のそれぞれが実行することは:

%%writefile main.py

import os
import json

import tensorflow as tf
import mnist

per_worker_batch_size = 64
tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])

strategy = tf.distribute.MultiWorkerMirroredStrategy()

global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist.mnist_dataset(global_batch_size)

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist.build_and_compile_cnn_model()


multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)
Writing main.py

そのノート上記のコードでglobal_batch_sizeに渡される、 Dataset.batch 、に設定されているper_worker_batch_size * num_workers 。各作業者がのバッチ処理する、これは性を保証per_worker_batch_sizeかかわらず、労働者の数の例。

現在のディレクトリには、両方のPythonファイルが含まれています。

ls *.py
main.py
mnist.py

JSONシリアライズのでTF_CONFIG環境変数に追加します。

os.environ['TF_CONFIG'] = json.dumps(tf_config)

さて、あなたが実行され、ワーカープロセスを起動することができmain.pyし、使用TF_CONFIG

# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log

上記のコマンドについて注意すべき点がいくつかあります。

  1. これは、使用しています%%bashあるノートブック「魔法」いくつかのbashのコマンドを実行すると。
  2. これは、使用しています--bg実行するためのフラグをbashこの労働者は終了しませんので、バックグラウンドでプロセスを。それが始まる前にそれはすべての労働者を待ちます。

ので、バックグラウンドにワーカープロセスは、このノートブックに出力を印刷しない&>後でログファイルに何が起こったかを調べることができるようにファイルに出力をリダイレクトします。

したがって、プロセスが開始するまで数秒待ちます。

import time
time.sleep(10)

ここで、これまでにワーカーのログファイルに出力されたものを調べます。

cat job_0.log
2021-08-20 01:21:57.459034: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-08-20 01:21:57.459133: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:21:57.459414: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:21:57.459531: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 470.57.2
2021-08-20 01:21:57.459575: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 470.57.2
2021-08-20 01:21:57.459586: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 470.57.2
2021-08-20 01:21:57.460413: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-08-20 01:21:57.466180: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-08-20 01:21:57.466667: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:12345

ログファイルの最後の行は言う必要があります: Started server with target: grpc://localhost:12345 。これで最初のワーカーの準備が整い、他のすべてのワーカーの準備が整うのを待っています。

だから、更新tf_config拾うために、第2のワーカーのプロセスのために:

tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

2番目のワーカーを起動します。すべてのワーカーがアクティブであるため、これによりトレーニングが開始されます(したがって、このプロセスをバックグラウンドで実行する必要はありません)。

python main.py
Epoch 1/3
70/70 [==============================] - 6s 54ms/step - loss: 2.2796 - accuracy: 0.1292
Epoch 2/3
70/70 [==============================] - 4s 51ms/step - loss: 2.2285 - accuracy: 0.2898
Epoch 3/3
70/70 [==============================] - 4s 54ms/step - loss: 2.1706 - accuracy: 0.4835
2021-08-20 01:22:07.529925: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-08-20 01:22:07.529987: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:22:07.529996: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:22:07.530089: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 470.57.2
2021-08-20 01:22:07.530125: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 470.57.2
2021-08-20 01:22:07.530136: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 470.57.2
2021-08-20 01:22:07.530785: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-08-20 01:22:07.536395: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-08-20 01:22:07.536968: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:23456
2021-08-20 01:22:08.764867: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}

2021-08-20 01:22:08.983898: W tensorflow/core/framework/dataset.cc:679] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
2021-08-20 01:22:08.985655: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:185] None of the MLIR Optimization Passes are enabled (registered 2)

最初のワーカーによって書き込まれたログを再確認すると、そのモデルがそのモデルのトレーニングに参加したことがわかります。

cat job_0.log
2021-08-20 01:21:57.459034: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-08-20 01:21:57.459133: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:21:57.459414: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-2087993482
2021-08-20 01:21:57.459531: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 470.57.2
2021-08-20 01:21:57.459575: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 470.57.2
2021-08-20 01:21:57.459586: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 470.57.2
2021-08-20 01:21:57.460413: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-08-20 01:21:57.466180: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-08-20 01:21:57.466667: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:12345
2021-08-20 01:22:08.759563: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}

2021-08-20 01:22:08.976883: W tensorflow/core/framework/dataset.cc:679] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
2021-08-20 01:22:08.978435: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:185] None of the MLIR Optimization Passes are enabled (registered 2)
Epoch 1/3
70/70 [==============================] - 6s 54ms/step - loss: 2.2796 - accuracy: 0.1292
Epoch 2/3
70/70 [==============================] - 4s 52ms/step - loss: 2.2285 - accuracy: 0.2898
Epoch 3/3
70/70 [==============================] - 4s 54ms/step - loss: 2.1706 - accuracy: 0.4835

当然のことながら、このRAN遅くテストよりも、このチュートリアルの最初に実行します。

1台のマシンで複数のワーカーを実行すると、オーバーヘッドが増えるだけです。

ここでの目標は、トレーニング時間を改善することではなく、マルチワーカートレーニングの例を示すことだけでした。

# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.

マルチワーカートレーニングの詳細

これまで、基本的なマルチワーカーセットアップを実行する方法を学習しました。

チュートリアルの残りの部分では、実際のユースケースで役立つまたは重要になる可能性のある他の要因について詳しく学習します。

データセットのシャーディング

マルチ労働者の訓練では、データセットシャーディングは収束し、パフォーマンスを確保するために必要とされています。

前節の例が提供するデフォルトのautoshardingに依存しているtf.distribute.Strategy API。あなたは、設定することで、シャーディングを制御することができtf.data.experimental.AutoShardPolicytf.data.experimental.DistributeOptions

自動シャーディングの詳細については、を参照してください分散入力ガイド

ここで(推奨されません)各レプリカはすべての例を処理だから、オフシャーディング自動車をオンにする方法の簡単な例です:

options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF

global_batch_size = 64
multi_worker_dataset = mnist.mnist_dataset(batch_size=64)
dataset_no_auto_shard = multi_worker_dataset.with_options(options)

評価

あなたが渡すとvalidation_dataModel.fit 、それは各エポックのための訓練と評価切り替わります。服用評価validation_data労働者の同じセットに分散され、評価結果を集約し、すべての労働者のために利用可能です。

トレーニングと同様に、検証データセットはファイルレベルで自動的にシャーディングされます。あなたは、検証データセットのグローバルなバッチサイズを設定し、設定する必要がvalidation_steps

評価には、繰り返しデータセットを使用することもお勧めします。

または、定期的にチェックポイントを読み取り、評価を実行する別のタスクを作成することもできます。これはEstimatorが行うことです。ただし、これは評価を実行するための推奨される方法ではないため、その詳細は省略されています。

パフォーマンス

あなたは今、すべての複数の労働者で実行するように設定されているKerasモデル持っMultiWorkerMirroredStrategy

マルチワーカートレーニングのパフォーマンスを微調整するには、次のことを試してください。

  • tf.distribute.MultiWorkerMirroredStrategy複数提供集団通信の実装を

    • RINGクロスホスト通信層としてgRPCを使用して実装環系集団。
    • NCCL使用していますNVIDIA集合通信ライブラリを集団を実装します。
    • AUTO 、実行時に選択を延期します。

    集合的な実装の最適な選択は、GPUの数、GPUのタイプ、およびクラスター内のネットワーク相互接続によって異なります。自動選択を無効にするには、指定communication_optionsパラメータMultiWorkerMirroredStrategyのコンストラクタを。例えば:

    communication_options=tf.distribute.experimental.CommunicationOptions(implementation=tf.distribute.experimental.CollectiveCommunication.NCCL)
    
  • 変数をキャストtf.float可能な場合:

    • 公式ResNetモデルは、例えば、これを行う方法のを。

フォールトトレランス

同期トレーニングでは、ワーカーの1つに障害が発生し、障害回復メカニズムが存在しない場合、クラスターに障害が発生します。

Kerasを使用tf.distribute.Strategy 、労働者が死ぬか、そうでなければ不安定な場合にフォールトトレランスの利点が付属しています。これを行うには、選択した分散ファイルシステムでトレーニング状態を保持します。これにより、以前に失敗またはプリエンプトされたインスタンスを再起動すると、トレーニング状態が回復します。

ワーカーが使用できなくなると、他のワーカーは失敗します(おそらくタイムアウト後)。このような場合、使用できないワーカー、および障害が発生した他のワーカーを再起動する必要があります。

ModelCheckpointコールバック

ModelCheckpointコールバックはもはやフォールトトレランス機能を提供し、使用しないでくださいBackupAndRestore代わりにコールバックを。

ModelCheckpointコールバックは、まだチェックポイントを保存するために使用することができます。ただし、これにより、トレーニングが中断された場合、または正常に終了した場合、チェックポイントからトレーニングを続行するには、ユーザーがモデルを手動でロードする必要があります。

必要に応じてユーザーが外モデル/ウェイトを保存し、復元することを選択できModelCheckpointコールバック。

モデルの保存と読み込み

使用してモデルを保存するにはmodel.saveまたはtf.saved_model.save 、保存先は、各労働者のために異なることが必要です。

  • チーフワーカー以外の場合は、モデルを一時ディレクトリに保存する必要があります。
  • チーフの場合は、提供されたモデルディレクトリに保存する必要があります。

複数のワーカーが同じ場所に書き込もうとしたことによるエラーを防ぐために、ワーカーの一時ディレクトリは一意である必要があります。

すべてのディレクトリに保存されているモデルは同一であり、通常、復元または提供するには、チーフによって保存されたモデルのみを参照する必要があります。

トレーニングが完了すると、ワーカーによって作成された一時ディレクトリを削除するクリーンアップロジックが必要です。

チーフとワーカーを同時に節約する理由は、チェックポイント中に変数を集約している可能性があるためです。これには、チーフとワーカーの両方がallreduce通信プロトコルに参加する必要があります。一方、チーフとワーカーを同じモデルディレクトリに保存させると、競合によるエラーが発生します。

使用MultiWorkerMirroredStrategy 、プログラムはすべての労働者に実行され、現在の労働者がチーフであるかどうかを知るために、その属性があり、クラスタリゾルバオブジェクトを利用してtask_typetask_id

  • task_type現在のジョブが(例えば何であるかを示しています'worker' )。
  • task_idあなたの労働者の識別子を伝えます。
  • 労働者task_id == 0チーフ労働者として指定されています。

以下のスニペットのコードでは、 write_filepath機能は、労働者のに依存する、書き込みへのファイル・パスを提供していtask_id

  • (とチーフ労働者のためにtask_id == 0 )、それは元のファイル・パスに書き込みます。
  • 他の労働者のために、それは一時的なディレクトリ-作成temp_dir -with task_idして書き込むためにディレクトリ・パスに:
model_path = '/tmp/keras-model'

def _is_chief(task_type, task_id):
  # Note: there are two possible `TF_CONFIG` configuration.
  #   1) In addition to `worker` tasks, a `chief` task type is use;
  #      in this case, this function should be modified to
  #      `return task_type == 'chief'`.
  #   2) Only `worker` task type is used; in this case, worker 0 is
  #      regarded as the chief. The implementation demonstrated here
  #      is for this case.
  # For the purpose of this Colab section, the `task_type is None` case
  # is added because it is effectively run with only a single worker.
  return (task_type == 'worker' and task_id == 0) or task_type is None

def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)
write_model_path = write_filepath(model_path, task_type, task_id)

これで、保存する準備が整いました。

multi_worker_model.save(write_model_path)
2021-08-20 01:22:24.305980: W tensorflow/python/util/util.cc:348] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them.
INFO:tensorflow:Assets written to: /tmp/keras-model/assets
INFO:tensorflow:Assets written to: /tmp/keras-model/assets

上記のように、後でモデルは保存されたパスチーフからのみロードする必要があるため、チーフ以外のワーカーが保存した一時的なものを削除しましょう。

if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(os.path.dirname(write_model_path))

さて、負荷のときの時間は、のが便利使用してみましょうtf.keras.models.load_model APIを、さらに作業を続行します。

ここでは、唯一の負荷に、単一の労働者を使用すると想定して訓練を続け、あなたは呼び出すことはありません、その場合にはtf.keras.models.load_model別以内strategy.scope() (なお、 strategy = tf.distribute.MultiWorkerMirroredStrategy()以前に定義されているように、 )::

loaded_model = tf.keras.models.load_model(model_path)

# Now that the model is restored, and can continue with the training.
loaded_model.fit(single_worker_dataset, epochs=2, steps_per_epoch=20)
Epoch 1/2
20/20 [==============================] - 1s 16ms/step - loss: 2.2960 - accuracy: 0.0000e+00
Epoch 2/2
20/20 [==============================] - 0s 15ms/step - loss: 2.2795 - accuracy: 0.0000e+00
<keras.callbacks.History at 0x7f633b103910>

チェックポイントの保存と復元

一方、チェックポイントを使用すると、モデル全体を保存しなくても、モデルの重みを保存して復元できます。

ここで、あなたは1作成しますtf.train.Checkpointによって管理されているモデルで、追跡tf.train.CheckpointManagerのみ、最新のチェックポイントが保存されているように、:

checkpoint_dir = '/tmp/ckpt'

checkpoint = tf.train.Checkpoint(model=multi_worker_model)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id)
checkpoint_manager = tf.train.CheckpointManager(
    checkpoint, directory=write_checkpoint_dir, max_to_keep=1)

いったんCheckpointManager設定されている、あなたは今保存し、非チーフ労働者が保存されていたチェックポイントを削除する準備が整いました:

checkpoint_manager.save()
if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(write_checkpoint_dir)

あなたがモデルを復元する必要がある場合さて、あなたは便利な使用して保存された最新のチェックポイントを見つけることができますtf.train.latest_checkpoint機能を。チェックポイントを復元した後、トレーニングを続行できます。

latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
checkpoint.restore(latest_checkpoint)
multi_worker_model.fit(multi_worker_dataset, epochs=2, steps_per_epoch=20)
2021-08-20 01:22:26.176660: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}

2021-08-20 01:22:26.388321: W tensorflow/core/framework/dataset.cc:679] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Epoch 1/2
20/20 [==============================] - 3s 13ms/step - loss: 2.2948 - accuracy: 0.0000e+00
Epoch 2/2
20/20 [==============================] - 0s 13ms/step - loss: 2.2785 - accuracy: 0.0000e+00
<keras.callbacks.History at 0x7f635d404450>

BackupAndRestoreコールバック

tf.keras.callbacks.experimental.BackupAndRestoreコールバックは、下の一時的なチェックポイント・ファイル内のモデルと現在のエポック番号をバックアップすることにより、フォールトトレランス機能を提供しbackup_dirの引数BackupAndRestore 。これは、各エポックの終わりに行われます。

ジョブが中断されて再開されると、コールバックは最後のチェックポイントを復元し、トレーニングは中断されたエポックの最初から続行されます。中断前に未完了のエポックですでに行われた部分的なトレーニングは破棄されるため、最終的なモデルの状態には影響しません。

これを使用するには、インスタンスの提供tf.keras.callbacks.experimental.BackupAndRestoreModel.fitコール。

MultiWorkerMirroredStrategy労働者が中断された場合、中断労働者が再起動されるまで、クラスタ全体が一時停止します。他のワーカーも再起動し、中断されたワーカーがクラスターに再参加します。次に、すべてのワーカーが以前に保存されたチェックポイントファイルを読み取り、以前の状態を取得します。これにより、クラスターの同期を取り戻すことができます。その後、トレーニングが続行されます。

BackupAndRestoreコールバックは使用していますCheckpointManager保存し、トラックが最新のものと一緒にチェックポイントを既存のことをチェックポイントと呼ばれるファイルを生成し、トレーニング状態を、復元します。このため、 backup_dir名前の衝突を避けるために、他のチェックポイントを保存するために再使用すべきではありません。

現在、 BackupAndRestoreコールバックが無い戦略を持つ単一の労働者、MirroredStrategy、およびMultiWorkerMirroredStrategyとマルチ労働者をサポートしています。以下は、マルチワーカートレーニングとシングルワーカートレーニングの両方の2つの例です。

# Multi-worker training with MultiWorkerMirroredStrategy
# and the BackupAndRestore callback.

callbacks = [tf.keras.callbacks.experimental.BackupAndRestore(backup_dir='/tmp/backup')]
with strategy.scope():
  multi_worker_model = mnist.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
                       epochs=3,
                       steps_per_epoch=70,
                       callbacks=callbacks)
2021-08-20 01:22:29.530251: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
Epoch 1/3
70/70 [==============================] - 3s 12ms/step - loss: 2.2759 - accuracy: 0.1625
Epoch 2/3
70/70 [==============================] - 1s 12ms/step - loss: 2.2146 - accuracy: 0.2761
Epoch 3/3
70/70 [==============================] - 1s 12ms/step - loss: 2.1456 - accuracy: 0.4344
<keras.callbacks.History at 0x7f635d2aac90>

あなたがのディレクトリ調べる場合はbackup_dirあなたが指定したBackupAndRestore 、あなたには、いくつかの一時的に生成チェックポイントファイルに気づくことがあります。これらのファイルは、以前に失われたインスタンスを回復するために必要とされる、と彼らはの終わりにライブラリーによって削除されますModel.fitあなたのトレーニングの成功の終了時に。

追加のリソース

  1. TensorFlowの中に分散トレーニングガイドは、利用可能な流通戦略の概要を説明します。
  2. KerasとMultiWorkerMirroredStrategyとカスタムトレーニングループチュートリアルショーがどのように使用するMultiWorkerMirroredStrategy Kerasと、カスタム・トレーニング・ループを。
  3. チェックアウトの公式モデルが、その多くは、複数の流通戦略を実行するように設定することができます。
  4. tf.functionの持つ優れた性能ガイドは、以下のような他の戦略とツールに関する情報提供TensorFlowプロファイラあなたがTensorFlowモデルのパフォーマンスを最適化するために使用することができます。