Keras で DTensors を使用する

TensorFlow.org で表示 Google Colab で実行 GitHubでソースを表示 ノートブックをダウンロード

概要

このチュートリアルでは、Keras で DTensor を使用する方法について学習します。

DTensor を Keras と組み合わせることで、分散型機械学習モデルの構築とトレーニングに既存の Keras レイヤーとモデルを再利用することができます。

MNIST データを使用してマルチレイヤーの分類モデルをトレーニングします。サブクラス化モデル、Sequential モデル、Functional モデルのレイアウトの設定について説明します。

このチュートリアルでは、すでに「DTensor プログラミングガイド」を読んでいること、MeshLayout などの基本的な DTensor の概念に精通していることを前提としています。

このチュートリアルでは、https://www.tensorflow.org/datasets/keras_example を基盤に使用しています。

MNIST モデルをビルドする

DTensor は、TensorFlow 2.9.0 リリースに含まれています。

pip install --quiet --upgrade --pre tensorflow tensorflow-datasets

次に、tensorflowtensorflow.experimental.dtensor をインポートし、8 個の仮想 CPU を使用するように TensorFlow を構成します。

この例では CPU を使用しますが、DTensor は CPU、GPU、または TPU デバイスで同じように動作します。

import tensorflow as tf
import tensorflow_datasets as tfds
from tensorflow.experimental import dtensor
2024-01-11 18:16:50.357646: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-01-11 18:16:50.357693: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-01-11 18:16:50.359187: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
def configure_virtual_cpus(ncpu):
  phy_devices = tf.config.list_physical_devices('CPU')
  tf.config.set_logical_device_configuration(
        phy_devices[0], 
        [tf.config.LogicalDeviceConfiguration()] * ncpu)

configure_virtual_cpus(8)
tf.config.list_logical_devices('CPU')

devices = [f'CPU:{i}' for i in range(8)]

決定論的疑似乱数ジェネレータ

1 つ注意しておかなければならないのは、DTensor API では、実行中の各クライアントに同じランダムシードがある必要があることです。そうすることで、重みの初期化で決定論的動作が得られます。これは、Keras で tf.keras.utils.set_random_seed() を使ってグローバルシードを設定することで行えます。

tf.keras.backend.experimental.enable_tf_random_generator()
tf.keras.utils.set_random_seed(1337)

データ並列メッシュを作成する

このチュートリアルでは、データ並列トレーニングを実演します。モデルの並列トレーニングと空間の並列トレーニングへの適応は、別のセットの Layout オブジェクトに切り替えるのと同じくらい単純です。データ並列を超える分散トレーニングについての詳細は、DTensor の詳細な ML チュートリアルをご覧ください。

データ並列トレーニングは、一般的に使用される並列トレーニングですが、tf.distribute.MirroredStrategy などによっても使用されます。

DTensor を使うと、データ並列トレーニングループは、単一の 'batch' 次元で構成される Mesh を使用します。各デバイスは、グローバルの batch からシャードを受け取るモデルのレプリカを実行します。

mesh = dtensor.create_mesh([("batch", 8)], devices=devices)

各デバイスがモデルの完全なレプリカを実行する過程で、モデルの変数がメッシュ(シャーディングなし)間で完全に複製されます。例として、この Mesh の階数 2 の重みに対して完全に複製されるレイアウトは、以下のようになります。

example_weight_layout = dtensor.Layout([dtensor.UNSHARDED, dtensor.UNSHARDED], mesh)  # or
example_weight_layout = dtensor.Layout.replicated(mesh, rank=2)

階数 2 のデータテンソルのレイアウトは、最初の次元(batch_sharded としても知られます)に沿ってシャーディングされます。

example_data_layout = dtensor.Layout(['batch', dtensor.UNSHARDED], mesh)  # or
example_data_layout = dtensor.Layout.batch_sharded(mesh, 'batch', rank=2)

レイアウトで Keras レイヤーを作成する

データ並列スキームでは通常、モデルの各レプリカがシャーディングされた入力データを使って計算を行えるように、完全に複製されたレイアウトを使ってモデルの重みを作成します。

レイヤーの重みのレイアウト情報を構成するために、Keras では、ほとんどの組み込みレイヤーで使用できる追加のパラメータをレイヤーコンストラクタに公開しています。

以下は、完全に複製された重みレイアウトを使用する小さな画像分類モデルを構築する例です。レイアウト情報の kernelbias は、kernel_layoutbias_layout の引数を介して tf.keras.layers.Dense に指定できます。組み込み Keras レイヤーのほとんどは、レイアウトの重みの Layout を明示的に指定できるようになっています。

unsharded_layout_2d = dtensor.Layout.replicated(mesh, 2)
unsharded_layout_1d = dtensor.Layout.replicated(mesh, 1)
model = tf.keras.models.Sequential([
  tf.keras.layers.Flatten(input_shape=(28, 28)),
  tf.keras.layers.Dense(128, 
                        activation='relu',
                        name='d1',
                        kernel_layout=unsharded_layout_2d, 
                        bias_layout=unsharded_layout_1d),
  tf.keras.layers.Dense(10,
                        name='d2',
                        kernel_layout=unsharded_layout_2d, 
                        bias_layout=unsharded_layout_1d)
])

レイアウト情報は、重みの layout プロパティを調べることで確認できます。

for weight in model.weights:
  print(f'Weight name: {weight.name} with layout: {weight.layout}')
  break
Weight name: d1/kernel:0 with layout: Layout.from_string(sharding_specs:unsharded,unsharded, mesh:|batch=8|0,1,2,3,4,5,6,7|0,1,2,3,4,5,6,7|/job:localhost/replica:0/task:0/device:CPU:0,/job:localhost/replica:0/task:0/device:CPU:1,/job:localhost/replica:0/task:0/device:CPU:2,/job:localhost/replica:0/task:0/device:CPU:3,/job:localhost/replica:0/task:0/device:CPU:4,/job:localhost/replica:0/task:0/device:CPU:5,/job:localhost/replica:0/task:0/device:CPU:6,/job:localhost/replica:0/task:0/device:CPU:7)

データセットを読み込んで入力パイプラインを構築する

MNIST データセットを読み込んで、それに使用する事前処理用の入力パイプラインを構成します。データセット自体は DTensor レイアウト情報に関連付けられていません。今後の TensorFlow リリースにおいて、tf.data との DTensor Keras 統合が改善される予定です。

(ds_train, ds_test), ds_info = tfds.load(
    'mnist',
    split=['train', 'test'],
    shuffle_files=True,
    as_supervised=True,
    with_info=True,
)
def normalize_img(image, label):
  """Normalizes images: `uint8` -> `float32`."""
  return tf.cast(image, tf.float32) / 255., label
batch_size = 128

ds_train = ds_train.map(
    normalize_img, num_parallel_calls=tf.data.AUTOTUNE)
ds_train = ds_train.cache()
ds_train = ds_train.shuffle(ds_info.splits['train'].num_examples)
ds_train = ds_train.batch(batch_size)
ds_train = ds_train.prefetch(tf.data.AUTOTUNE)
ds_test = ds_test.map(
    normalize_img, num_parallel_calls=tf.data.AUTOTUNE)
ds_test = ds_test.batch(batch_size)
ds_test = ds_test.cache()
ds_test = ds_test.prefetch(tf.data.AUTOTUNE)

モデルのトレーニングロジックを定義する

次に、モデルのトレーニングロジックと評価ロジックを定義します。

TensorFlow 2.9 の時点では、DTensor が有効化された Keras モデルにカスタムトレーニングループを書き込む必要があります。これは、入力データに、Keras の標準の tf.keras.Model.fit() または tf.keras.Model.eval() 関数には組み込まれていない適切なレイアウト情報を詰め込むために行います。さらに多くの tf.data サポートが、今後のリリースで追加される予定です。

@tf.function
def train_step(model, x, y, optimizer, metrics):
  with tf.GradientTape() as tape:
    logits = model(x, training=True)
    # tf.reduce_sum sums the batch sharded per-example loss to a replicated
    # global loss (scalar).
    loss = tf.reduce_sum(tf.keras.losses.sparse_categorical_crossentropy(
        y, logits, from_logits=True))

  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))

  for metric in metrics.values():
    metric.update_state(y_true=y, y_pred=logits)

  loss_per_sample = loss / len(x)
  results = {'loss': loss_per_sample}
  return results
@tf.function
def eval_step(model, x, y, metrics):
  logits = model(x, training=False)
  loss = tf.reduce_sum(tf.keras.losses.sparse_categorical_crossentropy(
        y, logits, from_logits=True))

  for metric in metrics.values():
    metric.update_state(y_true=y, y_pred=logits)

  loss_per_sample = loss / len(x)
  results = {'eval_loss': loss_per_sample}
  return results
def pack_dtensor_inputs(images, labels, image_layout, label_layout):
  num_local_devices = image_layout.mesh.num_local_devices()
  images = tf.split(images, num_local_devices)
  labels = tf.split(labels, num_local_devices)
  images = dtensor.pack(images, image_layout)
  labels = dtensor.pack(labels, label_layout)
  return  images, labels

Metric と Optimizer

Keras MetricOptimizer を使って DTensor API を使用する場合、追加のメッシュ情報を指定して、内部状態変数とテンソルがモデルの変数と連携できるようにする必要があります。

  • オプティマイザの場合、DTensor は keras.dtensor.experimental.optimizers という新しい実験的な名前空間を使用します。多くの既存の Keras Optimizer は、追加の mesh 引数を受け取るように拡張されます。今後のリリースでは、Keras のコアオプティマイザにマージされる可能性があります。

  • 指標の場合、DTensor 対応の Metric になるように、コンストラクタに直接引数として mesh を指定できます。

optimizer = tf.keras.dtensor.experimental.optimizers.Adam(0.01, mesh=mesh)
metrics = {'accuracy': tf.keras.metrics.SparseCategoricalAccuracy(mesh=mesh)}
eval_metrics = {'eval_accuracy': tf.keras.metrics.SparseCategoricalAccuracy(mesh=mesh)}

モデルのトレーニング

以下の例では、batch 次元で入力パイプラインのデータをシャード化し、完全に複製された重みをもつモデルを使ってトレーニングします。

モデルは 3 つのエポックで、約 97% の精度を達成します。

num_epochs = 3

image_layout = dtensor.Layout.batch_sharded(mesh, 'batch', rank=4)
label_layout = dtensor.Layout.batch_sharded(mesh, 'batch', rank=1)

for epoch in range(num_epochs):
  print("============================") 
  print("Epoch: ", epoch)
  for metric in metrics.values():
    metric.reset_state()
  step = 0
  results = {}
  pbar = tf.keras.utils.Progbar(target=None, stateful_metrics=[])
  for input in ds_train:
    images, labels = input[0], input[1]
    images, labels = pack_dtensor_inputs(
        images, labels, image_layout, label_layout)

    results.update(train_step(model, images, labels, optimizer, metrics))
    for metric_name, metric in metrics.items():
      results[metric_name] = metric.result()

    pbar.update(step, values=results.items(), finalize=False)
    step += 1
  pbar.update(step, values=results.items(), finalize=True)

  for metric in eval_metrics.values():
    metric.reset_state()
  for input in ds_test:
    images, labels = input[0], input[1]
    images, labels = pack_dtensor_inputs(
        images, labels, image_layout, label_layout)
    results.update(eval_step(model, images, labels, eval_metrics))

  for metric_name, metric in eval_metrics.items():
    results[metric_name] = metric.result()

  for metric_name, metric in results.items():
    print(f"{metric_name}: {metric.numpy()}")
============================
Epoch:  0
    469/Unknown - 7s 15ms/step - loss: 0.2907 - accuracy: 0.8308
    469/Unknown - 4s 9ms/step - loss: 0.1285 - accuracy: 0.9595
    469/Unknown - 4s 9ms/step - loss: 0.1010 - accuracy: 0.9682
loss: 0.044021397829055786
accuracy: 0.9682833552360535
eval_loss: 0.05413995310664177
eval_accuracy: 0.9656000137329102

既存のモデルコードのレイアウトを指定する

ほとんどの場合、モデルは特定のユースケースでうまく動作するようになっているため、モデル内の個別のレイヤーに Layout 情報を指定する作業は膨大であり、多数の編集作業が必要となります。

既存の Keras モデルを DTensor API で動作できるようにするための変換作業を行いやすくするために、グローバルな観点で Layout を指定できる新しい dtensor.LayoutMap API を使用できます。

まず、LayoutMap インスタンスを作成する必要があります。これは、モデルの重みに指定するすべての Layout を構成するディクショナリのようなオブジェクトです。

LayoutMap には、init 時に Mesh インスタンスが必要です。これは、Layout が構成されていない、任意の重みに対するデフォルトの複製済み Layout を指定するために使用できます。すべてのモデルの重みを完全に複製するだけの場合は、空の LayoutMap を指定すると、デフォルトのメッシュを使って複製された Layout が作成されます。

LayoutMap は、文字列をキーとして、Layout を値として使用します。通常の Python dict とこのクラスでは、動作が異なります。文字列キーは、値を取得する際の正規表現として処理されます。

Subclassed モデル

Keras のサブクラス化モデル構文を使って定義された以下のモデルについて考察してみましょう。

class SubclassedModel(tf.keras.Model):

  def __init__(self, name=None):
    super().__init__(name=name)
    self.feature = tf.keras.layers.Dense(16)
    self.feature_2 = tf.keras.layers.Dense(24)
    self.dropout = tf.keras.layers.Dropout(0.1)

  def call(self, inputs, training=None):
    x = self.feature(inputs)
    x = self.dropout(x, training=training)
    return self.feature_2(x)

このモデルには、2 つの Dense レイヤーに対し kernelbias レイヤーという 4 つの重みがあります。それぞれは、オブジェクトパスに基づいてマッピングされています。

  • model.feature.kernel
  • model.feature.bias
  • model.feature_2.kernel
  • model.feature_2.bias

注意: Subclassed モデルでは、マッピングから Layout を取得する際に、レイヤーの .name 属性ではなく、属性名がキーとして使用されます。これは、tf.Module のチェックポイント設定が使う規則と同じです。多数のレイヤーを持つ複雑なモデルでは、チェックポイントを手動で検査することで、属性のマッピングを確認できます。

では、以下の LayoutMap を定義して、モデルを適用しましょう。

layout_map = tf.keras.dtensor.experimental.LayoutMap(mesh=mesh)

layout_map['feature.*kernel'] = dtensor.Layout.batch_sharded(mesh, 'batch', rank=2)
layout_map['feature.*bias'] = dtensor.Layout.batch_sharded(mesh, 'batch', rank=1)

with layout_map.scope():
  subclassed_model = SubclassedModel()

モデルの重みは最初のセルに作成されているため、DTensor 入力でモデルを呼び出し、重みに期待されるレイアウトがあることを確認します。

dtensor_input = dtensor.copy_to_mesh(tf.zeros((16, 16)), layout=unsharded_layout_2d)
# Trigger the weights creation for subclass model
subclassed_model(dtensor_input)

print(subclassed_model.feature.kernel.layout)
Layout.from_string(sharding_specs:batch,unsharded, mesh:|batch=8|0,1,2,3,4,5,6,7|0,1,2,3,4,5,6,7|/job:localhost/replica:0/task:0/device:CPU:0,/job:localhost/replica:0/task:0/device:CPU:1,/job:localhost/replica:0/task:0/device:CPU:2,/job:localhost/replica:0/task:0/device:CPU:3,/job:localhost/replica:0/task:0/device:CPU:4,/job:localhost/replica:0/task:0/device:CPU:5,/job:localhost/replica:0/task:0/device:CPU:6,/job:localhost/replica:0/task:0/device:CPU:7)

これにより、既存のコードを更新することなく、Layout をすばやくモデルにマッピングすることができます。

Sequential モデルと Functional モデル

Keras Functional モデルと Sequential モデルの場合も、LayoutMap を使用できます。

注意: Functional モデルと Sequential モデルでは、マッピングにわずかな違いがあります。モデルのレイヤーには、モデルに接続された公開属性がありません(ただし、model.layers を介してリストとしてアクセス可能です)。この場合、文字列名をキーとして使用します。文字列名は、モデル内で必ず一意の値です。

layout_map = tf.keras.dtensor.experimental.LayoutMap(mesh=mesh)

layout_map['feature.*kernel'] = dtensor.Layout.batch_sharded(mesh, 'batch', rank=2)
layout_map['feature.*bias'] = dtensor.Layout.batch_sharded(mesh, 'batch', rank=1)
with layout_map.scope():
  inputs = tf.keras.Input((16,), batch_size=16)
  x = tf.keras.layers.Dense(16, name='feature')(inputs)
  x = tf.keras.layers.Dropout(0.1)(x)
  output = tf.keras.layers.Dense(32, name='feature_2')(x)
  model = tf.keras.Model(inputs, output)

print(model.layers[1].kernel.layout)
Layout.from_string(sharding_specs:batch,unsharded, mesh:|batch=8|0,1,2,3,4,5,6,7|0,1,2,3,4,5,6,7|/job:localhost/replica:0/task:0/device:CPU:0,/job:localhost/replica:0/task:0/device:CPU:1,/job:localhost/replica:0/task:0/device:CPU:2,/job:localhost/replica:0/task:0/device:CPU:3,/job:localhost/replica:0/task:0/device:CPU:4,/job:localhost/replica:0/task:0/device:CPU:5,/job:localhost/replica:0/task:0/device:CPU:6,/job:localhost/replica:0/task:0/device:CPU:7)
with layout_map.scope():
  model = tf.keras.Sequential([
      tf.keras.layers.Dense(16, name='feature', input_shape=(16,)),
      tf.keras.layers.Dropout(0.1),
      tf.keras.layers.Dense(32, name='feature_2')
  ])

print(model.layers[2].kernel.layout)
Layout.from_string(sharding_specs:batch,unsharded, mesh:|batch=8|0,1,2,3,4,5,6,7|0,1,2,3,4,5,6,7|/job:localhost/replica:0/task:0/device:CPU:0,/job:localhost/replica:0/task:0/device:CPU:1,/job:localhost/replica:0/task:0/device:CPU:2,/job:localhost/replica:0/task:0/device:CPU:3,/job:localhost/replica:0/task:0/device:CPU:4,/job:localhost/replica:0/task:0/device:CPU:5,/job:localhost/replica:0/task:0/device:CPU:6,/job:localhost/replica:0/task:0/device:CPU:7)