このページは Cloud Translation API によって翻訳されました。
Switch to English

Kerasによるマルチワーカートレーニング

TensorFlow.orgで表示 GoogleColabで実行 GitHubでソースを表示ノートブックをダウンロード

概要概要

このチュートリアルでは、 tf.distribute.Strategy API、具体的にはtf.distribute.MultiWorkerMirroredStrategyを使用したtf.distribute.Strategyモデルを使用したマルチワーカー分散トレーニングについて説明しtf.distribute.MultiWorkerMirroredStrategy 。この戦略の助けを借りて、単一のワーカーで実行するように設計されたKerasモデルは、最小限のコード変更で複数のワーカーでシームレスに作業できます。

TensorFlowの中に分散トレーニングガイドでは、流通戦略のより深い理解に興味のある人のためのTensorFlow支持体の概観のために利用可能であるtf.distribute.StrategyのAPI。

セットアップ

まず、いくつかの必要なインポート。

import json
import os
import sys

TensorFlowをインポートする前に、環境にいくつかの変更を加えます。

すべてのGPUを無効にします。これにより、すべてのワーカーが同じGPUを使用しようとすることによって発生するエラーが防止されます。実際のアプリケーションでは、各ワーカーは異なるマシン上にあります。

os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

TF_CONFIG環境変数をリセットします。これについては、後で詳しく説明します。

os.environ.pop('TF_CONFIG', None)

現在のディレクトリがPythonのパス上にあることを確認してください。これにより、ノートブックは後で%%writefileによって書き込まれたファイルをインポートできます。

if '.' not in sys.path:
  sys.path.insert(0, '.')

次に、TensorFlowをインポートします。

import tensorflow as tf

データセットとモデルの定義

次に、単純なモデルとデータセットのセットアップを使用してmnist.pyファイルを作成します。このPythonファイルは、このチュートリアルのワーカープロセスによって使用されます。

%%writefile mnist.py

import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the range [0, 255].
  # You need to convert them to float32 with values in the range [0, 1]
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
  return train_dataset

def build_and_compile_cnn_model():
  model = tf.keras.Sequential([
      tf.keras.Input(shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
      metrics=['accuracy'])
  return model
Writing mnist.py

少数のエポックについてモデルをトレーニングし、1人のワーカーの結果を観察して、すべてが正しく機能することを確認してください。トレーニングが進むにつれて、損失は減少し、精度は向上するはずです。

import mnist

batch_size = 64
single_worker_dataset = mnist.mnist_dataset(batch_size)
single_worker_model = mnist.build_and_compile_cnn_model()
single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
11493376/11490434 [==============================] - 0s 0us/step
Epoch 1/3
70/70 [==============================] - 2s 13ms/step - loss: 2.3003 - accuracy: 0.1326
Epoch 2/3
70/70 [==============================] - 1s 13ms/step - loss: 2.2360 - accuracy: 0.3211
Epoch 3/3
70/70 [==============================] - 1s 13ms/step - loss: 2.1608 - accuracy: 0.4944

<tensorflow.python.keras.callbacks.History at 0x7fa7f41a5400>

マルチワーカー構成

それでは、マルチワーカートレーニングの世界に入りましょう。 TensorFlowでは、 TF_CONFIG環境変数は、それぞれが異なる役割を持つ可能性のある複数のマシンでのトレーニングに必要です。 TF_CONFIGは、クラスターの一部である各ワーカーのクラスター構成を指定するために使用されるJSON文字列です。

構成例を次に示します。

tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

これは、JSON文字列としてTF_CONFIG化された同じTF_CONFIGです。

json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'

TF_CONFIG clustertask 2つのコンポーネントがありtask

  • clusterはすべてのワーカーで同じであり、 workerなどのさまざまなタイプのジョブで構成されるdictであるトレーニングクラスターに関する情報を提供します。 MultiWorkerMirroredStrategyマルチワーカートレーニングでは、通常のworkerが行うことに加えて、チェックポイントの保存やTensorBoardの要約ファイルの作成などのもう少し責任を負うworkerが通常1 workerいます。このような労働者は、次のように呼ばれているchief労働者、そしてあることが通例であるworkerindex 0がチーフに任命されたworker (実際には、これはどのようにあるtf.distribute.Strategy実装されています)。

  • taskは現在のタスクの情報を提供し、ワーカーごとに異なります。そのワーカーのtypeindexを指定します。

この例では、タスクtype"worker"し、タスクindex0ます。この機械は最初の労働者であり、主任労働者として任命され、他の機械よりも多くの仕事をします。他のマシンでもTF_CONFIG環境変数を設定する必要があり、 cluster dictは同じである必要がありclusterが、それらのマシンの役割に応じてタスクtypeまたはタスクindexが異なることに注意してください。

説明のために、このチュートリアルでは、 localhostで2つのワーカーをTF_CONFIGしてTF_CONFIGを設定する方法を示します。実際には、ユーザーは外部IPアドレス/ポートに複数のワーカーを作成し、各ワーカーにTF_CONFIGを適切に設定します。

この例では、2つのワーカーを使用します。最初のワーカーのTF_CONFIGは上に示されています。 2番目のワーカーの場合、 tf_config['task']['index']=1

上記のtf_configは、Pythonの単なるローカル変数です。実際にトレーニングを構成するために使用するには、このディクショナリをJSONとしてシリアル化し、 TF_CONFIG環境変数に配置する必要があります。

ノートブックの環境変数とサブプロセス

サブプロセスは、親から環境変数を継承します。したがって、このjupyter notebookプロセスで環境変数を設定すると、次のようになります。

os.environ['GREETINGS'] = 'Hello TensorFlow!'

サブプロセスから環境変数にアクセスできます。

echo ${GREETINGS}
Hello TensorFlow!

次のセクションでは、これを使用してTF_CONFIGをワーカーサブプロセスに渡します。この方法で実際にジョブを起動することは決してありませんが、このチュートリアルの目的には十分です。最小限のマルチワーカーの例を示すためです。

適切な戦略を選択する

TensorFlowには、分散トレーニングの2つの主要な形式があります。

  • トレーニングのステップがワーカーとレプリカ間で同期される同期トレーニング、および
  • トレーニングステップが厳密に同期されていない非同期トレーニング。

このガイドでは、同期マルチワーカートレーニングの推奨戦略であるMultiWorkerMirroredStrategyについて説明します。モデルをトレーニングするには、 tf.distribute.MultiWorkerMirroredStrategyインスタンスを使用します。

MultiWorkerMirroredStrategyは、すべてのワーカーの各デバイスのモデルのレイヤーにあるすべての変数のコピーを作成します。集合通信用のTensorFlow操作であるCollectiveOps使用して、グラデーションを集約し、変数の同期を維持します。 tf.distribute.Strategyガイドには、この戦略の詳細が記載されています。

strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/device:CPU:0',)
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO

MultiWorkerMirroredStrategyは、 CommunicationOptionsパラメーターを介して複数の実装を提供します。 RINGは、クロスホスト通信レイヤーとしてgRPCを使用して、リングベースの集合を実装します。 NCCLは、 NvidiaのNCCLを使用して集合を実装します。 AUTOは、選択をランタイムに延期します。集合的な実装の最適な選択は、GPUの数と種類、およびクラスター内のネットワーク相互接続によって異なります。

モデルをトレーニングする

統合によりtf.distribute.StrategyにAPI tf.kerasのみ変更するには、複数の研究者にトレーニングを配布するようになりますモデル構築と囲むさmodel.compile()内部コールstrategy.scope() 。分散戦略のスコープは、変数が作成される方法と場所を決定しますMultiWorkerMirroredStrategyの場合、作成される変数はMirroredVariableであり、各ワーカーに複製されます。

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist.build_and_compile_cnn_model()

MultiWorkerMirroredStrategy実際に実行するには、ワーカープロセスを実行し、それらにTF_CONFIGを渡す必要があります。

前に記述したmnist.pyファイルと同様に、各ワーカーが実行するmain.pyは次のmain.pyです。

%%writefile main.py

import os
import json

import tensorflow as tf
import mnist

per_worker_batch_size = 64
tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])

strategy = tf.distribute.MultiWorkerMirroredStrategy()

global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist.mnist_dataset(global_batch_size)

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist.build_and_compile_cnn_model()


multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)
Writing main.py

上記のコードスニペットでは、 global_batch_sizeに渡されるDataset.batchper_worker_batch_size * num_workers設定されていることにper_worker_batch_size * num_workers 。これにより、ワーカーの数に関係なく、各ワーカーがper_worker_batch_sizeサンプルのバッチを処理するようになります。

現在のディレクトリには、両方のPythonファイルが含まれています。

ls *.py
main.py
mnist.py

したがって、json- TF_CONFIGをシリアル化し、環境変数に追加します。

os.environ['TF_CONFIG'] = json.dumps(tf_config)

さて、あなたが実行され、ワーカープロセスを起動することができmain.pyし、使用TF_CONFIG

# first kill any previous runs
%killbgscripts
All background processes were killed.

python main.py &> job_0.log

上記のコマンドについて注意すべき点がいくつかあります。

  1. ノートブックの「魔法」である%%bashを使用して、いくつかのbashコマンドを実行します。
  2. このワーカーは終了しないため、 --bgフラグを使用してバックグラウンドでbashプロセスを実行します。それが始まる前にそれはすべての労働者を待ちます。

バックグラウンドのワーカープロセスはこのノートブックに出力を出力しないため、 &>は出力をファイルにリダイレクトし、何が起こったかを確認できます。

したがって、プロセスが開始するまで数秒待ちます。

import time
time.sleep(10)

ここで、これまでにワーカーのログファイルに出力されたものを確認します。

cat job_0.log
2021-01-21 02:25:42.175986: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
2021-01-21 02:25:43.946875: I tensorflow/compiler/jit/xla_cpu_device.cc:41] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-01-21 02:25:43.947856: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcuda.so.1
2021-01-21 02:25:45.054212: E tensorflow/stream_executor/cuda/cuda_driver.cc:328] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-01-21 02:25:45.054293: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-1568283877
2021-01-21 02:25:45.054344: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-1568283877
2021-01-21 02:25:45.054471: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 450.51.5
2021-01-21 02:25:45.054509: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 450.51.5
2021-01-21 02:25:45.054517: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 450.51.5
2021-01-21 02:25:45.055458: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX512F
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-01-21 02:25:45.055932: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-01-21 02:25:45.056674: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-01-21 02:25:45.061293: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:301] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-01-21 02:25:45.061752: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:411] Started server with target: grpc://localhost:12345

ログファイルの最後の行には、次のように表示されます。 Started server with target: grpc://localhost:12345 。これで最初のワーカーの準備が整い、他のすべてのワーカーの準備が整うのを待っています。

したがって、2番目のワーカーのプロセスがtf_configするようにtf_configを更新します。

tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

次に、2番目のワーカーを起動します。すべてのワーカーがアクティブであるため、これによりトレーニングが開始されます(したがって、このプロセスをバックグラウンドで実行する必要はありません)。

python main.py
Epoch 1/3
70/70 [==============================] - 6s 54ms/step - loss: 2.3042 - accuracy: 0.0955
Epoch 2/3
70/70 [==============================] - 4s 52ms/step - loss: 2.2330 - accuracy: 0.3486
Epoch 3/3
70/70 [==============================] - 4s 51ms/step - loss: 2.1501 - accuracy: 0.5445

2021-01-21 02:25:52.272702: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
2021-01-21 02:25:54.022387: I tensorflow/compiler/jit/xla_cpu_device.cc:41] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-01-21 02:25:54.023369: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcuda.so.1
2021-01-21 02:25:55.313133: E tensorflow/stream_executor/cuda/cuda_driver.cc:328] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-01-21 02:25:55.313203: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-1568283877
2021-01-21 02:25:55.313212: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-1568283877
2021-01-21 02:25:55.313332: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 450.51.5
2021-01-21 02:25:55.313373: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 450.51.5
2021-01-21 02:25:55.313386: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 450.51.5
2021-01-21 02:25:55.314278: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX512F
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-01-21 02:25:55.314696: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-01-21 02:25:55.315494: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-01-21 02:25:55.319659: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:301] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-01-21 02:25:55.320191: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:411] Started server with target: grpc://localhost:23456
2021-01-21 02:25:56.295743: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:656] In AUTO-mode, and switching to DATA-based sharding, instead of FILE-based sharding as we cannot find appropriate reader dataset op(s) to shard. Error: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}

2021-01-21 02:25:56.548639: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:116] None of the MLIR optimization passes are enabled (registered 2)
2021-01-21 02:25:56.549007: I tensorflow/core/platform/profile_utils/cpu_utils.cc:112] CPU Frequency: 2000185000 Hz

ここで、最初のワーカーによって書き込まれたログを再確認すると、そのモデルのトレーニングに参加していることがわかります。

cat job_0.log
2021-01-21 02:25:42.175986: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
2021-01-21 02:25:43.946875: I tensorflow/compiler/jit/xla_cpu_device.cc:41] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-01-21 02:25:43.947856: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcuda.so.1
2021-01-21 02:25:45.054212: E tensorflow/stream_executor/cuda/cuda_driver.cc:328] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-01-21 02:25:45.054293: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-1568283877
2021-01-21 02:25:45.054344: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-1568283877
2021-01-21 02:25:45.054471: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 450.51.5
2021-01-21 02:25:45.054509: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 450.51.5
2021-01-21 02:25:45.054517: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 450.51.5
2021-01-21 02:25:45.055458: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX512F
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-01-21 02:25:45.055932: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-01-21 02:25:45.056674: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-01-21 02:25:45.061293: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:301] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-01-21 02:25:45.061752: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:411] Started server with target: grpc://localhost:12345
2021-01-21 02:25:56.294854: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:656] In AUTO-mode, and switching to DATA-based sharding, instead of FILE-based sharding as we cannot find appropriate reader dataset op(s) to shard. Error: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}

2021-01-21 02:25:56.539862: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:116] None of the MLIR optimization passes are enabled (registered 2)
2021-01-21 02:25:56.540266: I tensorflow/core/platform/profile_utils/cpu_utils.cc:112] CPU Frequency: 2000185000 Hz
Epoch 1/3
70/70 [==============================] - 7s 54ms/step - loss: 2.3042 - accuracy: 0.0955
Epoch 2/3
70/70 [==============================] - 4s 52ms/step - loss: 2.2330 - accuracy: 0.3486
Epoch 3/3
70/70 [==============================] - 4s 51ms/step - loss: 2.1501 - accuracy: 0.5445

当然のことながら、これはこのチュートリアルの最初に実行されたテストより遅く実行されました。 1台のマシンで複数のワーカーを実行すると、オーバーヘッドが増えるだけです。ここでの目標は、トレーニング時間を改善することではなく、マルチワーカートレーニングの例を示すことだけでした。

# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.

マルチワーカートレーニングの詳細

これまでのところ、このチュートリアルでは、基本的なマルチワーカーのセットアップについて説明してきました。このドキュメントの残りの部分では、実際のユースケースに役立つまたは重要な他の要素について詳しく説明します。

データセットのシャーディング

マルチワーカートレーニングでは、収束とパフォーマンスを確保するためにデータセットのシャーディングが必要です。

前のセクションの例は、 tf.distribute.Strategyによって提供されるデフォルトの自動tf.distribute.Strategy依存しています。あなたは、設定することで、シャーディングを制御することができtf.data.experimental.AutoShardPolicytf.data.experimental.DistributeOptions 。自動シャーディングの詳細については、分散入力ガイドを参照してください

自動シャーディングをオフにして、各レプリカがすべての例を処理する方法の簡単な例を次に示します(非推奨)。

options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF

global_batch_size = 64
multi_worker_dataset = mnist.mnist_dataset(batch_size=64)
dataset_no_auto_shard = multi_worker_dataset.with_options(options)

評価

あなたが渡すとvalidation_datamodel.fit 、それは各エポックのための訓練と評価切り替わります。 validation_dataを取得validation_data評価は、同じワーカーセットに分散され、評価結果は集約され、すべてのワーカーが利用できます。トレーニングと同様に、検証データセットはファイルレベルで自動的にシャーディングされます。あなたは、検証データセットとセットにグローバルなバッチサイズを設定する必要がありますvalidation_steps 。評価には、繰り返しデータセットを使用することもお勧めします。

または、定期的にチェックポイントを読み取り、評価を実行する別のタスクを作成することもできます。これはEstimatorが行うことです。ただし、これは評価を実行するための推奨される方法ではないため、詳細は省略されています。

予測

現在、 model.predictmodel.predictでは機能しませんMultiWorkerMirroredStrategy.

パフォーマンス

これで、 MultiWorkerMirroredStrategyして複数のワーカーで実行するようにすべて設定されたMultiWorkerMirroredStrategy 。次の手法を試して、 MultiWorkerMirroredStrategyを使用してマルチワーカートレーニングのパフォーマンスを微調整MultiWorkerMirroredStrategyます。

  • MultiWorkerMirroredStrategyは、複数の集合的な通信実装を提供します。 RINGは、クロスホスト通信レイヤーとしてgRPCを使用して、リングベースの集合を実装します。 NCCLは、 NvidiaのNCCLを使用して集合を実装します。 AUTOは、選択をランタイムに延期します。集合的な実装の最適な選択は、GPUの数と種類、およびクラスター内のネットワーク相互接続によって異なります。自動選択をオーバーライドするには、 MultiWorkerMirroredStrategyのコンストラクターのcommunication_optionsパラメーターを指定しますcommunication_options=tf.distribute.experimental.CommunicationOptions(implementation=tf.distribute.experimental.CollectiveCommunication.NCCL)例: communication_options=tf.distribute.experimental.CommunicationOptions(implementation=tf.distribute.experimental.CollectiveCommunication.NCCL)
  • tf.float 、変数をtf.floatキャストします。公式のResNetモデルにはこれを行う方法のが含まれています。

フォールトトレランス

同期トレーニングでは、ワーカーの1つに障害が発生し、障害回復メカニズムが存在しない場合、クラスターに障害が発生します。 tf.distribute.Strategytf.distribute.Strategyを使用すると、作業者が死亡したり不安定になったりした場合のフォールトトレランスの利点があります。これを行うには、選択した分散ファイルシステムでトレーニング状態を保持します。これにより、以前に失敗またはプリエンプトされたインスタンスを再起動すると、トレーニング状態が回復します。

ワーカーが使用できなくなると、他のワーカーは失敗します(おそらくタイムアウト後)。このような場合、使用できないワーカー、および失敗した他のワーカーを再起動する必要があります。

ModelCheckpointコールバック

ModelCheckpointコールバックは、フォールトトレランス機能を提供しなくなりました。代わりにBackupAndRestoreコールバックを使用してください。

ModelCheckpointコールバックは、引き続きチェックポイントを保存するために使用できます。ただし、これにより、トレーニングが中断された場合、または正常に終了した場合、チェックポイントからトレーニングを続行するには、ユーザーがモデルを手動でロードする必要があります。

オプションで、ユーザーはModelCheckpointコールバックの外部でモデル/重みを保存および復元することを選択できます。

モデルの保存と読み込み

model.saveまたはtf.saved_model.saveを使用してモデルを保存するには、保存先がワーカーごとに異なる必要があります。チーフ以外のワーカーでは、モデルを一時ディレクトリに保存する必要があり、チーフでは、提供されたモデルディレクトリに保存する必要があります。複数のワーカーが同じ場所に書き込もうとしたことによるエラーを防ぐために、ワーカーの一時ディレクトリは一意である必要があります。すべてのディレクトリに保存されているモデルは同一であり、通常、復元または提供するには、チーフによって保存されたモデルのみを参照する必要があります。トレーニングが完了すると、ワーカーによって作成された一時ディレクトリを削除するクリーンアップロジックが必要です。

チーフとワーカーを同時に節約する必要がある理由は、チェックポイント中に変数を集約している可能性があるため、チーフとワーカーの両方がallreduce通信プロトコルに参加する必要があるためです。一方、チーフとワーカーを同じモデルディレクトリに保存させると、競合によるエラーが発生します。

MultiWorkerMirroredStrategyと、プログラムはすべてのワーカーで実行され、現在のワーカーがチーフであるかどうかを知るために、属性task_typeおよびtask_idを持つクラスターリゾルバーオブジェクトをtask_idます。 task_typeは、現在のジョブが何であるか(たとえば、「worker」)を示し、 task_idは、ワーカーの識別子を示します。 ID0のワーカーがチーフワーカーとして指定されます。

以下のコードスニペットでは、 write_filepathは、ワーカーIDに応じて書き込むファイルパスを提供します。チーフ(ID 0のワーカー)の場合、元のファイルパスに書き込みます。その他の場合は、書き込み用の一時ディレクトリ(ディレクトリパスにidを含む)を作成します。

model_path = '/tmp/keras-model'

def _is_chief(task_type, task_id):
  # If `task_type` is None, this may be operating as single worker, which works
  # effectively as chief.
  return task_type is None or task_type == 'chief' or (
            task_type == 'worker' and task_id == 0)

def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)
write_model_path = write_filepath(model_path, task_type, task_id)

これで、保存する準備が整いました。

multi_worker_model.save(write_model_path)
INFO:tensorflow:Assets written to: /tmp/keras-model/assets

INFO:tensorflow:Assets written to: /tmp/keras-model/assets

上記のように、後でモデルは保存されたパスチーフからのみロードする必要があるため、チーフ以外のワーカーが保存した一時的なものを削除しましょう。

if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(os.path.dirname(write_model_path))

さて、ロードするときは、便利なtf.keras.models.load_model APIを使用して、さらに作業を続けましょう。ここでは、単一のワーカーのみを使用してトレーニングをロードして続行すると仮定します。この場合、別のstrategy.scope()内でtf.keras.models.load_modelないでください。

loaded_model = tf.keras.models.load_model(model_path)

# Now that the model is restored, and can continue with the training.
loaded_model.fit(single_worker_dataset, epochs=2, steps_per_epoch=20)
Epoch 1/2
20/20 [==============================] - 1s 14ms/step - loss: 2.2981 - accuracy: 0.1039
Epoch 2/2
20/20 [==============================] - 0s 13ms/step - loss: 2.2813 - accuracy: 0.1430

<tensorflow.python.keras.callbacks.History at 0x7fa7f102cbe0>

チェックポイントの保存と復元

一方、チェックポイントを使用すると、モデル全体を保存しなくても、モデルの重みを保存して復元できます。ここでは、モデルを追跡する1つのtf.train.Checkpointを作成します。これは、 tf.train.CheckpointManagerによって管理され、最新のチェックポイントのみが保持されるようにします。

checkpoint_dir = '/tmp/ckpt'

checkpoint = tf.train.Checkpoint(model=multi_worker_model)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id)
checkpoint_manager = tf.train.CheckpointManager(
    checkpoint, directory=write_checkpoint_dir, max_to_keep=1)

CheckpointManagerを設定すると、保存する準備が整い、チーフワーカー以外が保存したチェックポイントを削除できます。

checkpoint_manager.save()
if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(write_checkpoint_dir)

これで、復元する必要があるときに、便利なtf.train.latest_checkpoint関数を使用して保存された最新のチェックポイントを見つけることができます。チェックポイントを復元した後、トレーニングを続行できます。

latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
checkpoint.restore(latest_checkpoint)
multi_worker_model.fit(multi_worker_dataset, epochs=2, steps_per_epoch=20)
Epoch 1/2
20/20 [==============================] - 3s 14ms/step - loss: 2.3013 - accuracy: 0.1083
Epoch 2/2
20/20 [==============================] - 0s 14ms/step - loss: 2.2850 - accuracy: 0.1703

<tensorflow.python.keras.callbacks.History at 0x7fa7f0fc6f60>

BackupAndRestoreコールバック

BackupAndRestoreコールバックは、下の一時的なチェックポイント・ファイル内のモデルと現在のエポック番号をバックアップすることで、フォールトトレランス機能を提供しbackup_dirの引数BackupAndRestore 。これは、各エポックの終わりに行われます。

ジョブが中断されて再開されると、コールバックは最後のチェックポイントを復元し、トレーニングは中断されたエポックの最初から続行されます。中断前に未完了のエポックですでに行われた部分的なトレーニングは破棄されるため、最終的なモデルの状態には影響しません。

これを使用するには、 tf.keras.Model.fit()呼び出しでtf.keras.callbacks.experimental.BackupAndRestoreインスタンスを提供します。

MultiWorkerMirroredStrategyを使用すると、ワーカーが中断された場合、中断されたワーカーが再起動されるまでクラスター全体が一時停止します。他のワーカーも再起動し、中断されたワーカーがクラスターに再参加します。次に、すべてのワーカーが以前に保存されたチェックポイントファイルを読み取り、以前の状態を取得することで、クラスターの同期を取り戻すことができます。その後、トレーニングは続行されます。

BackupAndRestoreコールバックは、 CheckpointManagerを使用してトレーニング状態を保存および復元します。これにより、既存のチェックポイントを最新のチェックポイントと一緒に追跡するチェックポイントと呼ばれるファイルが生成されます。このため、名前の衝突を避けるために、 backup_dirを他のチェックポイントの保存に再利用しないでください。

現在、 BackupAndRestoreコールバックは、 BackupAndRestoreないシングルワーカー、MirroredStrategy、およびMultiWorkerMirroredStrategyのあるマルチワーカーをサポートしています。以下は、マルチワーカートレーニングとシングルワーカートレーニングの両方の2つの例です。

# Multi-worker training with MultiWorkerMirroredStrategy.

callbacks = [tf.keras.callbacks.experimental.BackupAndRestore(backup_dir='/tmp/backup')]
with strategy.scope():
  multi_worker_model = mnist.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
                       epochs=3,
                       steps_per_epoch=70,
                       callbacks=callbacks)
Epoch 1/3
70/70 [==============================] - 4s 13ms/step - loss: 2.2811 - accuracy: 0.1663
Epoch 2/3
70/70 [==============================] - 1s 13ms/step - loss: 2.1709 - accuracy: 0.3821
Epoch 3/3
70/70 [==============================] - 1s 13ms/step - loss: 2.0420 - accuracy: 0.5454

<tensorflow.python.keras.callbacks.History at 0x7fa7f0623fd0>

あなたがのディレクトリ調べる場合はbackup_dirあなたが指定したBackupAndRestore 、あなたには、いくつかの一時的に生成チェックポイントファイルに気づくことがあります。これらのファイルは、以前に失われたインスタンスを回復するために必要であり、トレーニングが正常に終了すると、 tf.keras.Model.fit()最後にライブラリによって削除されます。

も参照してください

  1. TensorFlowガイドの分散トレーニングでは、利用可能な配布戦略の概要を説明しています。
  2. 公式モデル。その多くは、複数の配布戦略を実行するように構成できます。
  3. ガイドの「パフォーマンス」セクションには、TensorFlowモデルのパフォーマンスを最適化するために使用できる他の戦略とツールに関する情報が記載されています。