DTensor の概念

TensorFlow.org で表示 Google Colabで実行 GitHubでソースを表示 ノートブックをダウンロード

概要

この Colab では、同期分散コンピューティングを行うための TensorFlow の拡張機能として提供されている DTensor を紹介します。

DTensor は、開発者がデバイス間の分散を内部的に管理しながら、Tensor でグローバルに動作するアプリケーションを作成できるグローバルプログラミングモデルを提供します。DTensor は、SPMD(単一プログラム複数データ) expansion と呼ばれる手順を通じて、シャーディングディレクティブに従ってプログラムとテンソルを分散します。

アプリケーションとシャーディングディレクティブを分離することで、DTensor は、グローバルセマンティクスを保持しながら、単一のデバイス、複数のデバイス、または複数のクライアントにおける同一のアプリケーションの実行を可能にします。

このガイドでは、分散コンピューティングの DTensor の概念と、DTensor が TensorFlow とどのように統合するかについて説明します。モデルトレーニングで DTensor を使用したデモについては、DTensor を使った分散型トレーニングチュートリアルをご覧ください。

セットアップ

DTensor は TensorFlow 2.9.0 リリースの一部であり、2022 年 4 月 9 日より、TensorFlow ナイトリービルドにも含まれています。

pip install --quiet --upgrade --pre tensorflow

インストールが完了したら、tensorflowtf.experimental.dtensor をインポートします。そして、6 個の仮想 CPU を使用するように、TensorFlow を構成します。

この例では vCPU を使用しますが、DTensor は CPU、GPU、または TPU デバイスでも同じように動作します。

import tensorflow as tf
from tensorflow.experimental import dtensor

print('TensorFlow version:', tf.__version__)

def configure_virtual_cpus(ncpu):
  phy_devices = tf.config.list_physical_devices('CPU')
  tf.config.set_logical_device_configuration(phy_devices[0], [
        tf.config.LogicalDeviceConfiguration(),
    ] * ncpu)

configure_virtual_cpus(6)
DEVICES = [f'CPU:{i}' for i in range(6)]

tf.config.list_logical_devices('CPU')
2024-01-11 19:29:04.703904: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-01-11 19:29:04.703959: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-01-11 19:29:04.705496: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
TensorFlow version: 2.15.0
[LogicalDevice(name='/device:CPU:0', device_type='CPU'),
 LogicalDevice(name='/device:CPU:1', device_type='CPU'),
 LogicalDevice(name='/device:CPU:2', device_type='CPU'),
 LogicalDevice(name='/device:CPU:3', device_type='CPU'),
 LogicalDevice(name='/device:CPU:4', device_type='CPU'),
 LogicalDevice(name='/device:CPU:5', device_type='CPU')]

分散テンソルの DTensor モデル

DTensor は、dtensor.Meshdtensor.Layout の 2 つの概念を導入します。これらはテンソルのシャーディングをトポロジー的に関連するデバイス間でモデル化する抽象です。

  • Mesh は、コンピュテーションのデバイスリストを定義します。
  • Layout は、Mesh でテンソル次元をシャーディングする方法を定義します。

Mesh

Mesh は、一連のデバイスの論理的な直行トポロジーを表現します。直行グリッドの各次元はメッシュ次元と呼ばれ、名前で参照されます。同じ Mesh 内のメッシュの名前は一意である必要があります。

メッシュ次元の名前は Layout によって参照され、tf.Tensor の各軸に沿ったシャーディングの動作を説明します。これについては、Layout に関する後方のセクションでさらに詳しく説明します。

Mesh は、デバイスの多次元配列として考えることができます。

1 次元 Mesh では、すべてのデバイスが単一のメッシュ次元でリストを形成します。次の例では、dtensor.create_mesh を使用して、6 CPU デバイスから 6 デバイスのサイズを持つメッシュ次元 'x' のメッシュを作成します。

6 CPU による 1 次元メッシュ

mesh_1d = dtensor.create_mesh([('x', 6)], devices=DEVICES)
print(mesh_1d)
Mesh.from_string(|x=6|0,1,2,3,4,5|0,1,2,3,4,5|/job:localhost/replica:0/task:0/device:CPU:0,/job:localhost/replica:0/task:0/device:CPU:1,/job:localhost/replica:0/task:0/device:CPU:2,/job:localhost/replica:0/task:0/device:CPU:3,/job:localhost/replica:0/task:0/device:CPU:4,/job:localhost/replica:0/task:0/device:CPU:5)

Mesh は多次元でもあります。次の例では、6 CPU デバイスで 3x2 のメッシュを形成します。'x' 次元メッシュは 3 デバイスのサイズ、'y' 次元メッシュは 2 デバイスのサイズです。

A 2 dimensional mesh with 6 CPUs

mesh_2d = dtensor.create_mesh([('x', 3), ('y', 2)], devices=DEVICES)
print(mesh_2d)
Mesh.from_string(|x=3,y=2|0,1,2,3,4,5|0,1,2,3,4,5|/job:localhost/replica:0/task:0/device:CPU:0,/job:localhost/replica:0/task:0/device:CPU:1,/job:localhost/replica:0/task:0/device:CPU:2,/job:localhost/replica:0/task:0/device:CPU:3,/job:localhost/replica:0/task:0/device:CPU:4,/job:localhost/replica:0/task:0/device:CPU:5)

Layout

Layout は、テンソルが Mesh でどのように分散されるか、またはシャーディングされるかを指定します。

注意: MeshLayout を混同しないために、このガイドでは、次元と言った場合は常に Mesh に、と言った場合は常に TensorLayout に関連付けています。

Layout の階数は、Layout が適用されている Tensor の階数と同じです。Tensor の各軸では、Layout がテンソルをシャーディングするメッシュ次元を指定しているか、字句を "シャーディングなし" として指定する場合があります。テンソルはシャーディングされていない任意のメッシュ次元で複製されます。

Layout の階数と Mesh の次元数が一致している必要はありません。Layoutunsharded の軸がメッシュ次元に関連する必要も、unsharded メッシュ次元が layout 軸に関連している必要もありません。

Diagram of dtensor components.

前のセクションで作成した MeshLayout の例をいくつか分析してみましょう。

[("x", 6)] などの 1 次元メッシュ(前のセクションの mesh_1d)では、Layout(["unsharded", "unsharded"], mesh_1d) は、6 個のデバイスで複製された 2 階数のテンソルのレイアウトです。1 階数のメッシュで複製されたテンソル

layout = dtensor.Layout([dtensor.UNSHARDED, dtensor.UNSHARDED], mesh_1d)

同じテンソルとメッシュを使用すると、レイアウト Layout(['unsharded', 'x']) は、6 個のデバイスでテンソルの 2 番目の軸をシャーディングします。

A tensor sharded across a rank-1 mesh

layout = dtensor.Layout([dtensor.UNSHARDED, 'x'], mesh_1d)

[("x", 3), ("y", 2)] などの 2 次元の 3x2 メッシュ(前のセクションの mesh_2d)とした場合、Layout(["y", "x"], mesh_2d) は 2 階数 Tensor のレイアウトで、最初の軸はメッシュ次元 "y" で、2 番目の軸はメッシュ次元 "x" でシャーディングされます。

A tensorr with it's first axis sharded across mesh dimension 'y' and it's second axis sharded across mesh dimension 'x'

layout = dtensor.Layout(['y', 'x'], mesh_2d)

同じ mesh_2d において、レイアウト Layout(["x", dtensor.UNSHARDED], mesh_2d) は、"y" で複製される 2 階数 Tensor のレイアウトで、最初の軸はメッシュ次元 x でシャーディングされます。

A tensor replicated across mesh-dimension y, with it's first axis sharded across mesh dimension 'x'

layout = dtensor.Layout(["x", dtensor.UNSHARDED], mesh_2d)

シングルクライアントとマルチクライアントのアプリケーション

DTensor は、シングルクライアントとマルチクライアントの両方のアプリケーションをサポートしています。Colab の Python カーネルはシングルクライアントアプリケーションの例で、Python プロセスが 1 つしかありません。

マルチクライアント DTensor アプリケーションでは、複数の Python プロセスが一貫性のあるアプリケーションとして集合的に実行します。マルチクライアント DTensor アプリケーションの Mesh の直交グリッドは、現在のクライアントにローカルで接続されているか、別のクライアントにリモートで接続されているかに関係なく、デバイス全体に広がります。Mesh が使用する一連の全デバイスは、グローバルデバイスリストと呼ばれます。

マルチクライアント DTensor アプリケーションでの Mesh の作成は、すべての参加クライアントが同一のグローバルデバイスリストを使う集合的な演算で、Mesh の作成はグローバルなバリアとして機能します。

Mesh を作成中、各クライアントはローカルデバイスリストと期待されるグローバアルデバイスリストを提供し、DTensor はそれら両方のリストが一貫していることを検証します。マルチクライアントメッシュの作成とグローバルデバイスリストの詳細については、dtensor.create_meshdtensor.create_distributed_mesh の API ドキュメントをご覧ください。

シングルクライアントは、クライアントが 1 つしかないマルチクライアントの特殊ケースとして考え得られます。シングルクライアントアプリケーションの場合、グローバルデバイスリストローカルデバイスリストと同一です。

シャーディングされたテンソルとしての DTensor

では、DTensor を使ってコーディングを始めましょう。ヘルパー関数の dtensor_from_array は、tf.Tensor のように見えるものから DTensor を作成する方法を説明しています。この関数は 2 つのステップを実行します。

  • テンソルをメッシュ上のすべてのデバイスに複製する
  • 引数でリクエストされているレイアウトに従って、コピーをシャーディングする
def dtensor_from_array(arr, layout, shape=None, dtype=None):
  """Convert a DTensor from something that looks like an array or Tensor.

  This function is convenient for quick doodling DTensors from a known,
  unsharded data object in a single-client environment. This is not the
  most efficient way of creating a DTensor, but it will do for this
  tutorial.
  """
  if shape is not None or dtype is not None:
    arr = tf.constant(arr, shape=shape, dtype=dtype)

  # replicate the input to the mesh
  a = dtensor.copy_to_mesh(arr,
          layout=dtensor.Layout.replicated(layout.mesh, rank=layout.rank))
  # shard the copy to the desirable layout
  return dtensor.relayout(a, layout=layout)

DTensor の構造

DTensor は tf.Tensor オブジェクトですが、シャーディングの振る舞いを定義する Layout アノテーションで拡張されています。DTensor は以下の内容で構成されています。

  • テンソルのグローバルな形状と dtype などを含むグローバルテンソルメタデータ
  • Tensor が属する Mesh と、Tensor がその Mesh にどのようにシャーディングされるかを定義する Layout
  • Mesh 内のローカルデバイスあたり 1 つの項目を持つコンポーネントテンソルのリスト

dtensor_from_array を使用すると、最初の DTensor である my_first_dtensor を作成し、その内容を調べることができます。

mesh = dtensor.create_mesh([("x", 6)], devices=DEVICES)
layout = dtensor.Layout([dtensor.UNSHARDED], mesh)

my_first_dtensor = dtensor_from_array([0, 1], layout)

# Examine the dtensor content
print(my_first_dtensor)
print("global shape:", my_first_dtensor.shape)
print("dtype:", my_first_dtensor.dtype)
tf.Tensor([0 1], layout="sharding_specs:unsharded, mesh:|x=6|0,1,2,3,4,5|0,1,2,3,4,5|/job:localhost/replica:0/task:0/device:CPU:0,/job:localhost/replica:0/task:0/device:CPU:1,/job:localhost/replica:0/task:0/device:CPU:2,/job:localhost/replica:0/task:0/device:CPU:3,/job:localhost/replica:0/task:0/device:CPU:4,/job:localhost/replica:0/task:0/device:CPU:5", shape=(2,), dtype=int32)
global shape: (2,)
dtype: <dtype: 'int32'>

レイアウトと fetch_layout

DTensor のレイアウトは、tf.Tensor の通常の属性ではありません。代わりに DTensor は DTensor のレイアウトにアクセスするための関数 dtensor.fetch_layout を提供します。

print(dtensor.fetch_layout(my_first_dtensor))
assert layout == dtensor.fetch_layout(my_first_dtensor)
Layout.from_string(sharding_specs:unsharded, mesh:|x=6|0,1,2,3,4,5|0,1,2,3,4,5|/job:localhost/replica:0/task:0/device:CPU:0,/job:localhost/replica:0/task:0/device:CPU:1,/job:localhost/replica:0/task:0/device:CPU:2,/job:localhost/replica:0/task:0/device:CPU:3,/job:localhost/replica:0/task:0/device:CPU:4,/job:localhost/replica:0/task:0/device:CPU:5)

コンポーネントテンソル、packunpack

DTensor はコンポーネントテンソルのリストで構成されます。Mesh 内のデバイスのコンポーネントテンソルは、そのデバイスに格納されているグローバル DTensor を表現する Tensor オブジェクトです。

DTensor は dtensor.unpack を使ってコンポーネントテンソルにアンパックできます。dtensor.unpack を使用すれば、DTensor のコンポーネントを調べて、それらが Mesh のすべてのデバイス上にあることを確認できます。

グローバルビューのコンポーネントテンソルの位置は、互いに重なり合っていることに注意してください。たとえば、完全に複製されたレイアウトの場合、すべてのコンポーネントはグローバルテンソルの同一のレプリカになっています。

for component_tensor in dtensor.unpack(my_first_dtensor):
  print("Device:", component_tensor.device, ",", component_tensor)
Device: /job:localhost/replica:0/task:0/device:CPU:0 , tf.Tensor([0 1], shape=(2,), dtype=int32)
Device: /job:localhost/replica:0/task:0/device:CPU:1 , tf.Tensor([0 1], shape=(2,), dtype=int32)
Device: /job:localhost/replica:0/task:0/device:CPU:2 , tf.Tensor([0 1], shape=(2,), dtype=int32)
Device: /job:localhost/replica:0/task:0/device:CPU:3 , tf.Tensor([0 1], shape=(2,), dtype=int32)
Device: /job:localhost/replica:0/task:0/device:CPU:4 , tf.Tensor([0 1], shape=(2,), dtype=int32)
Device: /job:localhost/replica:0/task:0/device:CPU:5 , tf.Tensor([0 1], shape=(2,), dtype=int32)

示されているとおり、my_first_dtensor は、すべての 6 個のデバイスに複製されている [0, 1] のテンソルです。

dtensor.unpack の反対の演算は dtensor.pack です。コンポーネントテンソルは DTensor にパックし直すことができます。

コンポーネントには同じ階数と dtype がある必要があります。つまり、これが、戻される DTensor の階数と dtype になります。ただし、dtensor.unpack の入力として、コンポーネントテンソルのデバイスの配置に関する厳格な要件はありません。関数は、コンポーネントテンソルを自動的に対応するそれぞれのデバイスにコピーします。

packed_dtensor = dtensor.pack(
    [[0, 1], [0, 1], [0, 1],
     [0, 1], [0, 1], [0, 1]],
     layout=layout
)
print(packed_dtensor)
tf.Tensor([0 1], layout="sharding_specs:unsharded, mesh:|x=6|0,1,2,3,4,5|0,1,2,3,4,5|/job:localhost/replica:0/task:0/device:CPU:0,/job:localhost/replica:0/task:0/device:CPU:1,/job:localhost/replica:0/task:0/device:CPU:2,/job:localhost/replica:0/task:0/device:CPU:3,/job:localhost/replica:0/task:0/device:CPU:4,/job:localhost/replica:0/task:0/device:CPU:5", shape=(2,), dtype=int32)

DTensor をメッシュにシャーディングする

ここまで、my_first_dtensor を操作してきました。これは、1 次元 Mesh に完全に複製された 1 階数 DTensor です。

次は、2 次元 Mesh にシャーディングされた DTensor を作成して検査します。次の例では、6 個の CPU デバイス上の 3x2 Mesh でこの操作を行います。メッシュ次元 'x' のサイズは 3 デバイス、メッシュ次元 'y' のサイズは 2 デバイスです。

mesh = dtensor.create_mesh([("x", 3), ("y", 2)], devices=DEVICES)

2 次元メッシュで完全にシャーディングされた 2 階数 Tensor

3x2 の 2 階数 DTensor を作成し、最初の軸を 'x' メッシュ次元に沿って、2 番目の軸を 'y' メッシュ次元に沿ってシャーディングします。

  • テンソルの形状は、すべてのシャーディングされた軸に沿ってメッシュ次元と同じであるため、各デバイスは DTensor の1 つの要素を受け取ります。
  • コンポーネントテンソルの階数は、必ずグローバル形状の階数と同じです。DTensor はコンポーネントテンソルとグローバル DTensor の関係を特定するための情報を保持する単純な方法として、この手法を採用しています。
fully_sharded_dtensor = dtensor_from_array(
    tf.reshape(tf.range(6), (3, 2)),
    layout=dtensor.Layout(["x", "y"], mesh))

for raw_component in dtensor.unpack(fully_sharded_dtensor):
  print("Device:", raw_component.device, ",", raw_component)
Device: /job:localhost/replica:0/task:0/device:CPU:0 , tf.Tensor([[0]], shape=(1, 1), dtype=int32)
Device: /job:localhost/replica:0/task:0/device:CPU:1 , tf.Tensor([[1]], shape=(1, 1), dtype=int32)
Device: /job:localhost/replica:0/task:0/device:CPU:2 , tf.Tensor([[2]], shape=(1, 1), dtype=int32)
Device: /job:localhost/replica:0/task:0/device:CPU:3 , tf.Tensor([[3]], shape=(1, 1), dtype=int32)
Device: /job:localhost/replica:0/task:0/device:CPU:4 , tf.Tensor([[4]], shape=(1, 1), dtype=int32)
Device: /job:localhost/replica:0/task:0/device:CPU:5 , tf.Tensor([[5]], shape=(1, 1), dtype=int32)

2 次元メッシュで完全に複製された 2 階数 Tensor

比較するために、同じ 2 次元メッシュに完全に複製された 3x2 の 2 階数 DTensor を作成します。

  • DTensor は完全に複製されているため、各デバイスは 3x2 DTensor の完全レプリカを受け取ります。
  • コンポーネントテンソルの階数はグローバル形状の階数と同じです。この場合、コンポーネントテンソルの形状はいずれにしてもグローバル形状と同じであるため、特に難しい事ではありません。
fully_replicated_dtensor = dtensor_from_array(
    tf.reshape(tf.range(6), (3, 2)),
    layout=dtensor.Layout([dtensor.UNSHARDED, dtensor.UNSHARDED], mesh))
# Or, layout=tensor.Layout.fully_replicated(mesh, rank=2)

for component_tensor in dtensor.unpack(fully_replicated_dtensor):
  print("Device:", component_tensor.device, ",", component_tensor)
Device: /job:localhost/replica:0/task:0/device:CPU:0 , tf.Tensor(
[[0 1]
 [2 3]
 [4 5]], shape=(3, 2), dtype=int32)
Device: /job:localhost/replica:0/task:0/device:CPU:1 , tf.Tensor(
[[0 1]
 [2 3]
 [4 5]], shape=(3, 2), dtype=int32)
Device: /job:localhost/replica:0/task:0/device:CPU:2 , tf.Tensor(
[[0 1]
 [2 3]
 [4 5]], shape=(3, 2), dtype=int32)
Device: /job:localhost/replica:0/task:0/device:CPU:3 , tf.Tensor(
[[0 1]
 [2 3]
 [4 5]], shape=(3, 2), dtype=int32)
Device: /job:localhost/replica:0/task:0/device:CPU:4 , tf.Tensor(
[[0 1]
 [2 3]
 [4 5]], shape=(3, 2), dtype=int32)
Device: /job:localhost/replica:0/task:0/device:CPU:5 , tf.Tensor(
[[0 1]
 [2 3]
 [4 5]], shape=(3, 2), dtype=int32)

2 次元メッシュのハイブリッド 2 階数 Tensor

完全シャーディングと完全複製の合間の場合はどうでしょうか。

DTensor では、Layout をハイブリッドにすることができます。ある軸でシャーディングされ、他の軸で複製されたレイアウトです。

たとえば、同じ 3x2 の 2 階数 DTensor を以下のようにシャーディングできます。

  • 1 つ目の軸を 'x' 次元メッシュに沿ってシャーディング
  • 2 つ目の軸を 'y' 次元メッシュに沿って複製

このシャーディングスキームは、2 つ目の軸のシャーディング仕様を 'y' から dtensor.UNSHARDED に置き換え、2 番目の軸にそって複製する意図を示すだけで実現できます。レイアウトオブジェクトは Layout(['x', dtensor.UNSHARDED], mesh) のようになります。

hybrid_sharded_dtensor = dtensor_from_array(
    tf.reshape(tf.range(6), (3, 2)),
    layout=dtensor.Layout(['x', dtensor.UNSHARDED], mesh))

for component_tensor in dtensor.unpack(hybrid_sharded_dtensor):
  print("Device:", component_tensor.device, ",", component_tensor)
Device: /job:localhost/replica:0/task:0/device:CPU:0 , tf.Tensor([[0 1]], shape=(1, 2), dtype=int32)
Device: /job:localhost/replica:0/task:0/device:CPU:1 , tf.Tensor([[0 1]], shape=(1, 2), dtype=int32)
Device: /job:localhost/replica:0/task:0/device:CPU:2 , tf.Tensor([[2 3]], shape=(1, 2), dtype=int32)
Device: /job:localhost/replica:0/task:0/device:CPU:3 , tf.Tensor([[2 3]], shape=(1, 2), dtype=int32)
Device: /job:localhost/replica:0/task:0/device:CPU:4 , tf.Tensor([[4 5]], shape=(1, 2), dtype=int32)
Device: /job:localhost/replica:0/task:0/device:CPU:5 , tf.Tensor([[4 5]], shape=(1, 2), dtype=int32)

作成した DTensor のコンポーネントテンソルを検査し、これらが実際にスキームに従ってシャーディングされていることを確認できます。この様子をチャートで示すとわかりやすいでしょう。

A 3x2 hybrid mesh with 6 CPUs

Tensor.numpy() とシャーディングされた DTensor

シャーディングされた DTensor に .numpy() を呼び出すとエラーが発生することに注意してください。エラーが発生する理由は、複数のコンピューティングデバイスのデータが、返される NumPy 配列をサポートするホスト CPU デバイスに意図せずに収集されないようにするためです。

print(fully_replicated_dtensor.numpy())

try:
  fully_sharded_dtensor.numpy()
except tf.errors.UnimplementedError:
  print("got an error as expected for fully_sharded_dtensor")

try:
  hybrid_sharded_dtensor.numpy()
except tf.errors.UnimplementedError:
  print("got an error as expected for hybrid_sharded_dtensor")
[[0 1]
 [2 3]
 [4 5]]
got an error as expected for fully_sharded_dtensor
got an error as expected for hybrid_sharded_dtensor

DTensor での TensorFlow API

DTensor はプログラムのテンソルのドロップイン代替となることを目指しています。Ops ライブラリ関数の tf.functiontf.GradientTape といった、tf.Tensor を消費する TensorFlow Python API も DTensor と動作します。

それを実現するため、それぞれの TensorFlow Graph に対し、DTensor は SPMD expansion と呼ばれる手順で相当する SPMD グラフを生成して実行します。DTensor の SPMD expansion には、以下のような重要なステップがいくつか含まれます。

  • DTensor のシャーディング Layout を TensorFlow グラフに伝搬する
  • グローバル DTensor の TensorFlow Ops をコンポーネントテンソルの相当する TensorFlow Ops に書き換え、必要に応じて集合的な通信 Ops を挿入する
  • バックエンドの中立した TensorFlow Ops をバックエンド固有の TensorFlow Ops に降格する

最終的に、DTensor は Tensor のドロップイン代替になります。

注意: DTensor はまだ実験的 API であるため、DTensor プログラミングモデルの境界と制限を探索しながら克服する作業となります。

DTensor の実行は 2 つの方法でトリガーされます。

  • tf.matmul(a, b) のように、Python 関数のオペランドとしての DTensor は a または b のいずれか、または両方が DTensor である場合に DTensor を介して実行します。
  • Python 関数の結果が DTensor となるようにリクエストすると(dtensor.call_with_layout(tf.ones, layout, shape=(3, 2)) など)、tf.ones の出力が layout に従ってシャーディングされるようにリクエストすることになるため、DTensor を介して実行されます。

オペランドとしての DTensor

多数の TensorFlow API 関数はオペランドとして tf.Tensor を取り、結果として tf.Tensor を返します。このような関数の場合、DTensor をオペランドとして渡すことで、DTensor を介して関数を実行する意図を示すことができます。このセクションでは、例として tf.matmul(a, b) を使用します。

完全に複製された入力と出力

この場合、DTensors は完全に複製されています。Mesh の各デバイスで、以下のようになっています。

  • オペランド a のコンポーネントテンソルは [[1, 2, 3], [4, 5, 6]](2x3)
  • オペランド b のコンポーネントテンソルは [[6, 5], [4, 3], [2, 1]](3x2)
  • コンピュテーションは、単一の MatMul(2x3, 3x2) -> 2x2 で構成されます。
  • 結果 c のコンポーネントテンソルは [[20, 14], [56,41]](2x2)

浮動小数点 mul 演算の合計数は、6 device * 4 result * 3 mul = 72 です。

mesh = dtensor.create_mesh([("x", 6)], devices=DEVICES)
layout = dtensor.Layout([dtensor.UNSHARDED, dtensor.UNSHARDED], mesh)
a = dtensor_from_array([[1, 2, 3], [4, 5, 6]], layout=layout)
b = dtensor_from_array([[6, 5], [4, 3], [2, 1]], layout=layout)

c = tf.matmul(a, b) # runs 6 identical matmuls in parallel on 6 devices

# `c` is a DTensor replicated on all devices (same as `a` and `b`)
print('Sharding spec:', dtensor.fetch_layout(c).sharding_specs)
print("components:")
for component_tensor in dtensor.unpack(c):
  print(component_tensor.device, component_tensor.numpy())
Sharding spec: ['unsharded', 'unsharded']
components:
/job:localhost/replica:0/task:0/device:CPU:0 [[20 14]
 [56 41]]
/job:localhost/replica:0/task:0/device:CPU:1 [[20 14]
 [56 41]]
/job:localhost/replica:0/task:0/device:CPU:2 [[20 14]
 [56 41]]
/job:localhost/replica:0/task:0/device:CPU:3 [[20 14]
 [56 41]]
/job:localhost/replica:0/task:0/device:CPU:4 [[20 14]
 [56 41]]
/job:localhost/replica:0/task:0/device:CPU:5 [[20 14]
 [56 41]]

収縮した軸に沿ってオペランドをシャーディングする

デバイスごとのコンピュテーションの量は、オペランド ab をシャーディングすることで、減らすことができます。tf.matmul の一般的なシャーディングスキームは、収縮の軸に沿ったオペランドのシャーディングで、2 番目の軸に沿った a のシャーディングと 1 番目の軸に沿った b のシャーディングです。

このスキームでシャーディングされるグローバル行列積は、同時に実行するローカル matmul と、それに続くローカル結果を集計するための一括還元によって効率的に実行可能です。これは、分散行列ドット積の正規の方法でもあります。

浮動小数点 mul 演算の合計数は、6 devices * 4 result * 1 = 24 で、完全に複製された上記のケース(72)に対する係数 3 の還元です。係数 3 は、3 デバイスのサイズで x 次元メッシュに沿って共有されるためです。

順次実行される操作数の削減は、同期モデル並列処理がトレーニングを加速する主なメカニズムです。

mesh = dtensor.create_mesh([("x", 3), ("y", 2)], devices=DEVICES)
a_layout = dtensor.Layout([dtensor.UNSHARDED, 'x'], mesh)
a = dtensor_from_array([[1, 2, 3], [4, 5, 6]], layout=a_layout)
b_layout = dtensor.Layout(['x', dtensor.UNSHARDED], mesh)
b = dtensor_from_array([[6, 5], [4, 3], [2, 1]], layout=b_layout)

c = tf.matmul(a, b)
# `c` is a DTensor replicated on all devices (same as `a` and `b`)
print('Sharding spec:', dtensor.fetch_layout(c).sharding_specs)
Sharding spec: ['unsharded', 'unsharded']

追加シャーディング

入力に追加のシャーディングを実行し、結果に適切に引き継ぐことが可能です。たとえば、最初の軸に沿ったオペランド a の追加のシャーディングを 'y' 次元メッシュに適用することができます。追加のシャーディングは、結果 c の最初に軸に引き継がれます。

浮動小数点 mul 演算の合計数は、6 devices * 2 result * 1 = 12 で、完全に複製された上記のケース(24)に対する係数 2 の還元です。係数 2 は、2 デバイスのサイズで y 次元メッシュに沿って共有されるためです。

mesh = dtensor.create_mesh([("x", 3), ("y", 2)], devices=DEVICES)

a_layout = dtensor.Layout(['y', 'x'], mesh)
a = dtensor_from_array([[1, 2, 3], [4, 5, 6]], layout=a_layout)
b_layout = dtensor.Layout(['x', dtensor.UNSHARDED], mesh)
b = dtensor_from_array([[6, 5], [4, 3], [2, 1]], layout=b_layout)

c = tf.matmul(a, b)
# The sharding of `a` on the first axis is carried to `c'
print('Sharding spec:', dtensor.fetch_layout(c).sharding_specs)
print("components:")
for component_tensor in dtensor.unpack(c):
  print(component_tensor.device, component_tensor.numpy())
Sharding spec: ['y', 'unsharded']
components:
/job:localhost/replica:0/task:0/device:CPU:0 [[20 14]]
/job:localhost/replica:0/task:0/device:CPU:1 [[56 41]]
/job:localhost/replica:0/task:0/device:CPU:2 [[20 14]]
/job:localhost/replica:0/task:0/device:CPU:3 [[56 41]]
/job:localhost/replica:0/task:0/device:CPU:4 [[20 14]]
/job:localhost/replica:0/task:0/device:CPU:5 [[56 41]]

出力としての DTensor

オペランドを取らずに、シャーディング可能な Tensor 結果を返す Python 関数の場合はどうでしょうか。以下のような関数がこれに該当します。

こういった Python 関数の場合、DTensor には、DTensor で Python 関数を Eager 実行する dtensor.call_with_layout が備わっており、返される Tensor が要求された Layout を使った DTensor であることを保証します。

help(dtensor.call_with_layout)
Help on function call_with_layout in module tensorflow.dtensor.python.api:

call_with_layout(fn: Callable[..., Any], layout: Optional[tensorflow.dtensor.python.layout.Layout], *args, **kwargs) -> Any
    Calls a function in the DTensor device scope if `layout` is not None.
    
    If `layout` is not None, `fn` consumes DTensor(s) as input and produces a
    DTensor as output; a DTensor is a tf.Tensor with layout-related attributes.
    
    If `layout` is None, `fn` consumes and produces regular tf.Tensors.
    
    Args:
      fn: A supported TF API function such as tf.zeros.
      layout: Optional, the layout of the output DTensor.
      *args:  Arguments given to `fn`.
      **kwargs: Keyword arguments given to `fn`.
    
    Returns:
      The return value of `fn` transformed to a DTensor if requested.

Eager 実行された Python 関数には通常、1 つの自明ではない TensorFlow Op のみが含まれます。

dtensor.call_with_layout で複数の TensorFlow Op を発行する Python 関数を使用するには、関数を tf.function に変換する必要があります。tf.function の呼び出しは、単一の TensorFlow Op です。tf.function が呼び出されると、DTensor は tf.function の計算グラフを分析するときに、中間テンソルのいずれかが具体化される前にレイアウトの伝播を実行できます。

1 つの TensorFlow Op を発行する API

関数が 1 つの TensorFlow Op を発行する場合、その関数に直接 dtensor.call_with_layout を適用できます。

help(tf.ones)
Help on function ones in module tensorflow.python.ops.array_ops:

ones(shape, dtype=tf.float32, name=None, layout=None)
    Creates a tensor with all elements set to one (1).
    
    See also `tf.ones_like`, `tf.zeros`, `tf.fill`, `tf.eye`.
    
    This operation returns a tensor of type `dtype` with shape `shape` and
    all elements set to one.
    
    >>> tf.ones([3, 4], tf.int32)
    <tf.Tensor: shape=(3, 4), dtype=int32, numpy=
    array([[1, 1, 1, 1],
           [1, 1, 1, 1],
           [1, 1, 1, 1]], dtype=int32)>
    
    Args:
      shape: A `list` of integers, a `tuple` of integers, or a 1-D `Tensor` of
        type `int32`.
      dtype: Optional DType of an element in the resulting `Tensor`. Default is
        `tf.float32`.
      name: Optional string. A name for the operation.
      layout: Optional, `tf.experimental.dtensor.Layout`. If provided, the result
        is a [DTensor](https://www.tensorflow.org/guide/dtensor_overview) with the
        provided layout.
    
    Returns:
      A `Tensor` with all elements set to one (1).
mesh = dtensor.create_mesh([("x", 3), ("y", 2)], devices=DEVICES)
ones = dtensor.call_with_layout(tf.ones, dtensor.Layout(['x', 'y'], mesh), shape=(6, 4))
print(ones)
tf.Tensor({"CPU:0": [[1 1]
 [1 1]], "CPU:1": [[1 1]
 [1 1]], "CPU:2": [[1 1]
 [1 1]], "CPU:3": [[1 1]
 [1 1]], "CPU:4": [[1 1]
 [1 1]], "CPU:5": [[1 1]
 [1 1]]}, layout="sharding_specs:x,y, mesh:|x=3,y=2|0,1,2,3,4,5|0,1,2,3,4,5|/job:localhost/replica:0/task:0/device:CPU:0,/job:localhost/replica:0/task:0/device:CPU:1,/job:localhost/replica:0/task:0/device:CPU:2,/job:localhost/replica:0/task:0/device:CPU:3,/job:localhost/replica:0/task:0/device:CPU:4,/job:localhost/replica:0/task:0/device:CPU:5", shape=(6, 4), dtype=float32)

複数の TensorFlow Op を発行する API

API が複数の TensorFlow Op を発行する場合、tf.function を介して関数を 1 つの Op に変換します。たとえば、tf.random.stateleess_normal です。

help(tf.random.stateless_normal)
Help on function stateless_random_normal in module tensorflow.python.ops.stateless_random_ops:

stateless_random_normal(shape, seed, mean=0.0, stddev=1.0, dtype=tf.float32, name=None, alg='auto_select')
    Outputs deterministic pseudorandom values from a normal distribution.
    
    This is a stateless version of `tf.random.normal`: if run twice with the
    same seeds and shapes, it will produce the same pseudorandom numbers.  The
    output is consistent across multiple runs on the same hardware (and between
    CPU and GPU), but may change between versions of TensorFlow or on non-CPU/GPU
    hardware.
    
    Args:
      shape: A 1-D integer Tensor or Python array. The shape of the output tensor.
      seed: A shape [2] Tensor, the seed to the random number generator. Must have
        dtype `int32` or `int64`. (When using XLA, only `int32` is allowed.)
      mean: A 0-D Tensor or Python value of type `dtype`. The mean of the normal
        distribution.
      stddev: A 0-D Tensor or Python value of type `dtype`. The standard deviation
        of the normal distribution.
      dtype: The float type of the output: `float16`, `bfloat16`, `float32`,
        `float64`. Defaults to `float32`.
      name: A name for the operation (optional).
      alg: The RNG algorithm used to generate the random numbers. See
        `tf.random.stateless_uniform` for a detailed explanation.
    
    Returns:
      A tensor of the specified shape filled with random normal values.
ones = dtensor.call_with_layout(
    tf.function(tf.random.stateless_normal),
    dtensor.Layout(['x', 'y'], mesh),
    shape=(6, 4),
    seed=(1, 1))
print(ones)
tf.Tensor({"CPU:0": [[0.0368092842 1.76192284]
 [1.22868407 -0.731756687]], "CPU:1": [[0.255247623 -0.13820985]
 [-0.747412503 1.06443202]], "CPU:2": [[-0.395325899 -0.836183369]
 [0.581941128 -0.2587713]], "CPU:3": [[0.476060659 0.406645179]
 [-0.110623844 -1.49052978]], "CPU:4": [[0.645035267 1.36384416]
 [2.18210244 -0.965060234]], "CPU:5": [[-1.70534277 1.32558191]
 [0.972473264 0.972343624]]}, layout="sharding_specs:x,y, mesh:|x=3,y=2|0,1,2,3,4,5|0,1,2,3,4,5|/job:localhost/replica:0/task:0/device:CPU:0,/job:localhost/replica:0/task:0/device:CPU:1,/job:localhost/replica:0/task:0/device:CPU:2,/job:localhost/replica:0/task:0/device:CPU:3,/job:localhost/replica:0/task:0/device:CPU:4,/job:localhost/replica:0/task:0/device:CPU:5", shape=(6, 4), dtype=float32)

1 つの TensorFlow Op を発行する Python 関数を tf.function でラップすることができます。唯一の注意点は、Python 関数から tf.function を作成するための関連コストと複雑さが発生することです。

ones = dtensor.call_with_layout(
    tf.function(tf.ones),
    dtensor.Layout(['x', 'y'], mesh),
    shape=(6, 4))
print(ones)
tf.Tensor({"CPU:0": [[1 1]
 [1 1]], "CPU:1": [[1 1]
 [1 1]], "CPU:2": [[1 1]
 [1 1]], "CPU:3": [[1 1]
 [1 1]], "CPU:4": [[1 1]
 [1 1]], "CPU:5": [[1 1]
 [1 1]]}, layout="sharding_specs:x,y, mesh:|x=3,y=2|0,1,2,3,4,5|0,1,2,3,4,5|/job:localhost/replica:0/task:0/device:CPU:0,/job:localhost/replica:0/task:0/device:CPU:1,/job:localhost/replica:0/task:0/device:CPU:2,/job:localhost/replica:0/task:0/device:CPU:3,/job:localhost/replica:0/task:0/device:CPU:4,/job:localhost/replica:0/task:0/device:CPU:5", shape=(6, 4), dtype=float32)

tf.Variable から dtensor.DVariable

Tensorflow では、tf.Variable はミュータブルの Tensor 値のホルダーです。DTensor では、対応する変数のセマンティクスが dtensor.DVariable によって提供されます。

DTensor 変数に新しい型 Variable が導入されたのは、変数にはレイアウトを初期値から変更できないという追加の要件があるためです。

mesh = dtensor.create_mesh([("x", 6)], devices=DEVICES)
layout = dtensor.Layout([dtensor.UNSHARDED, dtensor.UNSHARDED], mesh)

v = dtensor.DVariable(
    initial_value=dtensor.call_with_layout(
        tf.function(tf.random.stateless_normal),
        layout=layout,
        shape=tf.TensorShape([64, 32]),
        seed=[1, 1],
        dtype=tf.float32))

print(v.handle)
assert layout == dtensor.fetch_layout(v)
tf.Tensor(<ResourceHandle(name="Variable/2", device="/job:localhost/replica:0/task:0/device:CPU:0", container="Anonymous", type="tensorflow::Var", dtype and shapes : "[ DType enum: 1, Shape: [64,32] ]")>, layout="sharding_specs:unsharded,unsharded, mesh:|x=6|0,1,2,3,4,5|0,1,2,3,4,5|/job:localhost/replica:0/task:0/device:CPU:0,/job:localhost/replica:0/task:0/device:CPU:1,/job:localhost/replica:0/task:0/device:CPU:2,/job:localhost/replica:0/task:0/device:CPU:3,/job:localhost/replica:0/task:0/device:CPU:4,/job:localhost/replica:0/task:0/device:CPU:5", shape=(), dtype=resource)

layout の一致に関する要件を除けば、Variabletf.Variable と同じように動作します。たとえば、変数を DTensor に追加できます。

a = dtensor.call_with_layout(tf.ones, layout=layout, shape=(64, 32))
b = v + a # add DVariable and DTensor
print(b)
tf.Tensor([[2.66521645 2.36637592 1.77863169 ... -1.18624139 2.26035929 0.664066315]
 [0.511952519 0.655031443 0.122243524 ... 0.0424078107 1.67057109 0.912334144]
 [0.769825 1.42743981 3.13473773 ... 1.16159868 0.628931046 0.733521938]
 ...
 [0.388001859 2.72882509 2.92771554 ... 1.17472672 1.72462416 1.5047121]
 [-0.252545118 0.761886716 1.72119033 ... 0.775034547 2.8065362 1.00457215]
 [1.23498726 0.584536672 1.15659761 ... 0.955793858 1.11440909 0.18848455]], layout="sharding_specs:unsharded,unsharded, mesh:|x=6|0,1,2,3,4,5|0,1,2,3,4,5|/job:localhost/replica:0/task:0/device:CPU:0,/job:localhost/replica:0/task:0/device:CPU:1,/job:localhost/replica:0/task:0/device:CPU:2,/job:localhost/replica:0/task:0/device:CPU:3,/job:localhost/replica:0/task:0/device:CPU:4,/job:localhost/replica:0/task:0/device:CPU:5", shape=(64, 32), dtype=float32)

また、DTensor を DVariable に代入することもできます。

v.assign(a) # assign a DTensor to a DVariable
print(a)
tf.Tensor([[1 1 1 ... 1 1 1]
 [1 1 1 ... 1 1 1]
 [1 1 1 ... 1 1 1]
 ...
 [1 1 1 ... 1 1 1]
 [1 1 1 ... 1 1 1]
 [1 1 1 ... 1 1 1]], layout="sharding_specs:unsharded,unsharded, mesh:|x=6|0,1,2,3,4,5|0,1,2,3,4,5|/job:localhost/replica:0/task:0/device:CPU:0,/job:localhost/replica:0/task:0/device:CPU:1,/job:localhost/replica:0/task:0/device:CPU:2,/job:localhost/replica:0/task:0/device:CPU:3,/job:localhost/replica:0/task:0/device:CPU:4,/job:localhost/replica:0/task:0/device:CPU:5", shape=(64, 32), dtype=float32)

DTensor に互換性のないレイアウトを割り当てて DVariable のレイアウトを変更しようとすると、エラーが発生します。

# variable's layout is immutable.
another_mesh = dtensor.create_mesh([("x", 3), ("y", 2)], devices=DEVICES)
b = dtensor.call_with_layout(tf.ones,
                     layout=dtensor.Layout([dtensor.UNSHARDED, dtensor.UNSHARDED], another_mesh),
                     shape=(64, 32))
try:
  v.assign(b)
except:
  print("exception raised")
exception raised

次のステップ

この Colab では、分散コンピューティングを行うための TensorFlow 拡張機能である DTensor について学習しました。チュートリアルでこれらの概念を試すには、DTensor による分散トレーニングをご覧ください。