MLコミュニティデーは11月9日です! TensorFlow、JAXからの更新のために私たちに参加し、より多くの詳細をご覧ください

ParameterServerStrategyを使用したパラメーターサーバーのトレーニング

TensorFlow.orgで表示 GoogleColabで実行 GitHubでソースを表示 ノートブックをダウンロードする

概要

パラメータサーバの訓練は、複数のマシン上でのモデルの訓練をスケールアップするための一般的なデータ並列方式です。

パラメータサーバの訓練クラスタは、労働者パラメータサーバーで構成されます。変数はパラメーターサーバー上に作成され、各ステップでワーカーによって読み取られて更新されます。デフォルトでは、ワーカーはこれらの変数を相互に同期せずに個別に読み取り、更新します。これは、非同期の訓練と呼ばれる理由時々 、サーバスタイルのトレーニングパラメータです。

TensorFlow 2では、パラメータサーバのトレーニングはによって供給されtf.distribute.experimental.ParameterServerStrategy数千人の労働者にスケールアップクラスタにトレーニング手順を配布するクラス、(パラメータサーバを伴います)。

サポートされているトレーニング方法

サポートされている主なトレーニング方法は2つあります。

ジョブとタスクを含むクラスター

かかわらず、選択肢のAPI(のModel.fit :またはカスタムトレーニングループ)、TensorFlow 2における分散訓練が必要'cluster'いくつかで'jobs' 、およびジョブのそれぞれは、一つ以上持っていること'tasks'

パラメータサーバートレーニングを使用する場合は、次のことをお勧めします。

  • (ジョブ名の持つ一つのコーディネーターの仕事chief
  • 複数のワーカー・ジョブ(ジョブ名のworker )。と
  • 複数のパラメータサーバー・ジョブ(ジョブ名ps

コーディネーターは、リソース、タスクの訓練派遣を作成しながら、チェックポイントを書き込み、およびタスクの失敗、労働者パラメータサーバ取引が実行tf.distribute.Serverコーディネーターからの要求を聞きます。

パラメータサーバの訓練Model.fit API

パラメータサーバのトレーニングModel.fit APIを使用するように調整を必要とtf.distribute.experimental.ParameterServerStrategyオブジェクト、およびtf.keras.utils.experimental.DatasetCreator入力として。同様にModel.fit無い戦略を持つ、または他の戦略の使用法、ワークフローが続くコールバック、準備、モデルを作成し、コンパイルする必要Model.fitコールを。

カスタムトレーニングループを使用したパラメーターサーバートレーニング

カスタムトレーニングループでは、 tf.distribute.experimental.coordinator.ClusterCoordinatorクラスは、コーディネーターのために使用するキーコンポーネントです。

提供される最も重要なAPI ClusterCoordinatorオブジェクトは、 schedule

  • schedule APIは、キューに入れtf.functionし、将来のような返しRemoteValueすぐに。
  • キューに入れられた機能は、バックグラウンドスレッドでリモートワーカーに派遣され、そのRemoteValue sが非同期で入力されます。
  • 以来schedule 、作業者の割り当てを必要としない、 tf.function使用可能な任意のワーカー上で実行することが可能で渡されました。
  • 実行されたワーカーが完了する前に使用できなくなった場合、関数は別の使用可能なワーカーで再試行されます。
  • この事実と関数の実行がアトミックではないという事実のために、関数は複数回実行される可能性があります。

リモート機能を派遣することに加えて、 ClusterCoordinatorまた、すべての労働者にデータセットを作成し、作業員が障害から回復したときにこれらのデータセットを再構築するのに役立ちます。

チュートリアルのセットアップ

チュートリアルでは、に分岐しますModel.fitおよびカスタムトレーニングループパス、あなたのニーズに合ったものを選ぶことができます。 「Xを使用したトレーニング」以外のセクションは、両方のパスに適用されます。

pip install portpicker
pip uninstall tensorflow keras -y
pip install tf-nightly
import multiprocessing
import os
import random
import portpicker
import tensorflow as tf
from tensorflow.keras.layers.experimental import preprocessing

クラスターのセットアップ

上述したように、パラメータサーバの訓練クラスタは、コーディネータあなたのトレーニングプログラムを実行するタスク、一つまたは複数の労働者とTensorFlow servers-実行パラメータサーバタスクが必要ですtf.distribute.Server側車の評価を実行し、おそらく追加の評価タスク-およびを(以下のサイドカー評価セクションを参照してください)。それらを設定するための要件は次のとおりです。

  • コーディネータータスクは、エバリュエーターを除く他のすべてのTensorFlowサーバーのアドレスとポートを知っている必要があります。
  • ワーカーとパラメーターサーバーは、リッスンする必要のあるポートを知る必要があります。簡単にするために、通常、これらのタスクでTensorFlowサーバーを作成するときに、完全なクラスター情報を渡すことができます。
  • 評価者タスクは、トレーニングクラスターの設定を知る必要はありません。含まれている場合は、トレーニングクラスターへの接続を試行しないでください。
  • 労働者とパラメータサーバは、タスクの種類を持っている必要があり"worker""ps"はそれぞれ、。コーディネーターは、使用すべき"chief" 、レガシーの理由でタスクタイプとして。

このチュートリアルでは、パラメーターサーバーのトレーニング全体をColabで実行できるように、インプロセスクラスターを作成します。あなたは、セットアップする方法について説明します本当のクラスタの後のセクションで。

インプロセスクラスター

まず、事前に複数のTensorFlowサーバーを作成し、後でそれらに接続します。注:これはのみ、このチュートリアルのデモンストレーションを目的としており、かつ実際の訓練でのサーバが上で開始されることを"worker""ps"のマシン。

def create_in_process_cluster(num_workers, num_ps):
  """Creates and starts local servers and returns the cluster_resolver."""
  worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
  ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]

  cluster_dict = {}
  cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
  if num_ps > 0:
    cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]

  cluster_spec = tf.train.ClusterSpec(cluster_dict)

  # Workers need some inter_ops threads to work properly.
  worker_config = tf.compat.v1.ConfigProto()
  if multiprocessing.cpu_count() < num_workers + 1:
    worker_config.inter_op_parallelism_threads = num_workers + 1

  for i in range(num_workers):
    tf.distribute.Server(
        cluster_spec,
        job_name="worker",
        task_index=i,
        config=worker_config,
        protocol="grpc")

  for i in range(num_ps):
    tf.distribute.Server(
        cluster_spec,
        job_name="ps",
        task_index=i,
        protocol="grpc")

  cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
      cluster_spec, rpc_layer="grpc")
  return cluster_resolver

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)
2021-07-22 01:22:29.962567: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-07-22 01:22:29.967320: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_SYSTEM_DRIVER_MISMATCH: system has unsupported display driver / cuda driver combination
2021-07-22 01:22:29.967351: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-1124739887
2021-07-22 01:22:29.967359: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-1124739887
2021-07-22 01:22:29.967434: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 470.57.2
2021-07-22 01:22:29.967458: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 465.27.0
2021-07-22 01:22:29.967464: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 465.27.0 does not match DSO version 470.57.2 -- cannot find working devices in this configuration
2021-07-22 01:22:29.971985: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:29.972012: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:29.972974: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:17310
2021-07-22 01:22:29.985134: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:29.985164: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:29.985628: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:22663
2021-07-22 01:22:30.034392: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.034437: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.035565: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:17641
2021-07-22 01:22:30.044623: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.044656: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.045149: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:19682
2021-07-22 01:22:30.090235: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.090288: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.090650: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:18874

インプロセスクラスタ設定は、しばしばなど、ユニットテストで使用され、ここ

地元のテストのための別のオプションが出て、ローカル・マシン・チェックのプロセスを起動することですKeras付きマルチ労働者の訓練をこのアプローチの例のために。

ParameterServerStrategyをインスタンス化します

あなたがトレーニングコードに飛び込む前に、インスタンス化させParameterServerStrategyオブジェクトを。これは関係なく、あなたが進んでいるかどうかに必要であることに注意してくださいModel.fitまたはカスタムトレーニングループ。 variable_partitioner引数は説明する変数のシャーディングセクション

variable_partitioner = (
    tf.distribute.experimental.partitioners.MinSizePartitioner(
        min_shard_bytes=(256 << 10),
        max_shards=NUM_PS))

strategy = tf.distribute.experimental.ParameterServerStrategy(
    cluster_resolver,
    variable_partitioner=variable_partitioner)
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'ps': ['localhost:19682', 'localhost:18874'], 'worker': ['localhost:17310', 'localhost:22663', 'localhost:17641']})
INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'ps': ['localhost:19682', 'localhost:18874'], 'worker': ['localhost:17310', 'localhost:22663', 'localhost:17641']})
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:chief/replica:0/task:0/device:CPU:0'], variable_device = '/job:chief/replica:0/task:0/device:CPU:0'
INFO:tensorflow:Number of GPUs on workers: 0
2021-07-22 01:22:30.112542: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.112587: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.112599: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.136652: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.136690: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.136703: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.136754: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.136781: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.136789: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.136876: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.136917: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.136931: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.136937: I tensorflow/core/distributed_runtime/eager/eager_service_impl.cc:270] Creating sync eager service context with rendezvous_id on host kokoro-gcp-ubuntu-prod-1124739887 /job:ps/replica:0/task:1
2021-07-22 01:22:30.136965: I tensorflow/core/distributed_runtime/eager/eager_service_impl.cc:270] Creating sync eager service context with rendezvous_id on host kokoro-gcp-ubuntu-prod-1124739887 /job:ps/replica:0/task:0
2021-07-22 01:22:30.137027: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.137060: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.137071: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.137088: I tensorflow/core/distributed_runtime/eager/eager_service_impl.cc:270] Creating sync eager service context with rendezvous_id on host kokoro-gcp-ubuntu-prod-1124739887 /job:worker/replica:0/task:0
2021-07-22 01:22:30.137149: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.137185: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.137196: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.137204: I tensorflow/core/distributed_runtime/eager/eager_service_impl.cc:270] Creating sync eager service context with rendezvous_id on host kokoro-gcp-ubuntu-prod-1124739887 /job:worker/replica:0/task:1
2021-07-22 01:22:30.138485: I tensorflow/core/distributed_runtime/eager/eager_service_impl.cc:270] Creating sync eager service context with rendezvous_id on host kokoro-gcp-ubuntu-prod-1124739887 /job:worker/replica:0/task:2
2021-07-22 01:22:30.139971: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.139993: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.140000: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.140286: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:34915

トレーニングにGPUを使用するには、各ワーカーに表示されるGPUを割り当てます。 ParameterServerStrategyすべての労働者が使用可能なGPUの数が同じでなければならないという制限で、各ワーカー上で使用可能なすべてのGPUを使用します。

可変シャーディング

変数シャーディングは、破片と呼ばれる複数の小さな変数に変数を分割することをいいます。可変シャーディングは、これらのシャードにアクセスするときにネットワーク負荷を分散するのに役立つ場合があります。正規変数の計算と保存を複数のパラメーターサーバーに分散することも役立ちます。

変数シャーディングを有効にするには、渡すことができvariable_partitioner構築する際にParameterServerStrategyオブジェクトを。 variable_partitioner変数が作成されるたびに呼び出されます、変数の各次元に沿った破片の数を返すことが期待されています。いくつかのアウトオブボックスvariable_partitioner Sは、このようなとして提供されているtf.distribute.experimental.partitioners.MinSizePartitioner 。のようなサイズベースのパーティを使用することをお勧めしtf.distribute.experimental.partitioners.MinSizePartitionerモデル訓練の速度にマイナスの影響を与える可能性が小さな変数を、分割を避けるために。

ときvariable_partitioner渡され、あなたが直下に変数を作成した場合strategy.scope()それが持つコンテナタイプになりますvariables破片のリストへのアクセスを提供しプロパティ。ほとんどの場合、このコンテナはすべてのシャードを連結することで自動的にTensorに変換されます。その結果、正規変数として使用できます。一方、のようないくつかのTensorFlow方法tf.nn.embedding_lookupこのコンテナのタイプのための効率的な実装を提供し、これらの方法で自動連結が回避されます。

APIドキュメントを参照してくださいtf.distribute.experimental.ParameterServerStrategy詳細については。

トレーニングModel.fit

Kerasは、経由で簡単に使用できるトレーニングAPIを提供Model.fitそのハンドル訓練オーバーライドの柔軟性を備えたフードの下ループ、 train_stepなTensorBoardのための貯蓄のチェックポイントの節約や要約などの機能を提供し、コールバック、。 Model.fit 、同じトレーニングコードは、戦略オブジェクトのシンプルスワップと他の戦略のために使用することができます。

入力データ

Model.fitパラメータサーバの訓練では、入力されたデータは、タイプの単一の引数とる呼び出し可能で提供されている必要がありtf.distribute.InputContext 、と返しtf.data.Dataset 。次に、作成tf.keras.utils.experimental.DatasetCreatorようになりますオブジェクトcallableし、オプションのtf.distribute.InputOptionsを経由してオブジェクトをinput_options引数。

シャッフルやリピートデータをパラメータサーバのトレーニングで、かつ指定することをお勧めしていることを注意steps_per_epochfitライブラリはエポックの境界を知っているように、コール。

見てください、分散入力の詳細については、チュートリアルInputContext引数を。

def dataset_fn(input_context):
  global_batch_size = 64
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)

  x = tf.random.uniform((10, 10))
  y = tf.random.uniform((10,))

  dataset = tf.data.Dataset.from_tensor_slices((x, y)).shuffle(10).repeat()
  dataset = dataset.shard(
      input_context.num_input_pipelines,
      input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2)

  return dataset

dc = tf.keras.utils.experimental.DatasetCreator(dataset_fn)

コードdataset_fnワーカーマシンのそれぞれに、通常CPUである入力装置に呼び出されます。

モデルの構築とコンパイル

さて、あなたは作成されますtf.keras.Model -a些細なtf.keras.models.Sequentialデモンストレーション目的-続きでのモデルModel.compileなどのようなオプティマイザ、メトリクス、またはパラメータなどのコンポーネント、組み込むために呼び出しsteps_per_execution

with strategy.scope():
  model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)])

model.compile(tf.keras.optimizers.SGD(), loss='mse', steps_per_execution=10)

コールバックとトレーニング

連絡する前にmodel.fit実際の訓練のために、のような一般的なタスクのために必要なコールバックを用意してみましょう:

  • ModelCheckpoint :モデルの重みを保存します。
  • BackupAndRestore :クラスタは(中止やプリエンプションなど)が利用できないを経験するかどうかわからトレーニングの進捗状況を自動的にバックアップし、回収されるようにします。また
  • TensorBoard :TensorBoardツールで可視化されます要約ファイルの中にプログレスレポートを保存します。
working_dir = '/tmp/my_working_dir'
log_dir = os.path.join(working_dir, 'log')
ckpt_filepath = os.path.join(working_dir, 'ckpt')
backup_dir = os.path.join(working_dir, 'backup')

callbacks = [
    tf.keras.callbacks.TensorBoard(log_dir=log_dir),
    tf.keras.callbacks.ModelCheckpoint(filepath=ckpt_filepath),
    tf.keras.callbacks.experimental.BackupAndRestore(backup_dir=backup_dir),
]

model.fit(dc, epochs=5, steps_per_epoch=20, callbacks=callbacks)
2021-07-22 01:22:30.205180: I tensorflow/core/profiler/lib/profiler_session.cc:131] Profiler session initializing.
2021-07-22 01:22:30.205213: I tensorflow/core/profiler/lib/profiler_session.cc:146] Profiler session started.
2021-07-22 01:22:30.207087: I tensorflow/core/profiler/lib/profiler_session.cc:164] Profiler session tear down.
Epoch 1/5
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
2021-07-22 01:22:34.281880: I tensorflow/core/profiler/lib/profiler_session.cc:131] Profiler session initializing.
2021-07-22 01:22:34.281923: I tensorflow/core/profiler/lib/profiler_session.cc:146] Profiler session started.
2021-07-22 01:22:34.290681: I tensorflow/core/profiler/lib/profiler_session.cc:66] Profiler session collecting data.
2021-07-22 01:22:34.291221: I tensorflow/core/profiler/lib/profiler_session.cc:164] Profiler session tear down.
2021-07-22 01:22:34.292249: I tensorflow/core/profiler/rpc/client/save_profile.cc:136] Creating directory: /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34

2021-07-22 01:22:34.292801: I tensorflow/core/profiler/rpc/client/save_profile.cc:142] Dumped gzipped tool data for trace.json.gz to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.trace.json.gz
2021-07-22 01:22:34.294605: I tensorflow/core/profiler/rpc/client/save_profile.cc:136] Creating directory: /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34

2021-07-22 01:22:34.294780: I tensorflow/core/profiler/rpc/client/save_profile.cc:142] Dumped gzipped tool data for memory_profile.json.gz to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.memory_profile.json.gz
2021-07-22 01:22:34.294930: I tensorflow/core/profiler/rpc/client/capture_profile.cc:251] Creating directory: /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34
Dumped tool data for xplane.pb to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.xplane.pb
Dumped tool data for overview_page.pb to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.overview_page.pb
Dumped tool data for input_pipeline.pb to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.input_pipeline.pb
Dumped tool data for tensorflow_stats.pb to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.tensorflow_stats.pb
Dumped tool data for kernel_stats.pb to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.kernel_stats.pb

2021-07-22 01:22:34.380988: W tensorflow/core/framework/dataset.cc:679] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
20/20 - 4s - loss: 0.2856 - 4s/epoch - 201ms/step
2021-07-22 01:22:34.737150: W tensorflow/python/util/util.cc:348] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
2021-07-22 01:22:34.993072: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
2021-07-22 01:22:35.067372: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
Epoch 2/5
20/20 - 0s - loss: 0.3160 - 187ms/epoch - 9ms/step
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
Epoch 3/5
20/20 - 0s - loss: 0.2000 - 32ms/epoch - 2ms/step
2021-07-22 01:22:35.567146: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
2021-07-22 01:22:35.639496: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
WARNING:tensorflow:5 out of the last 5 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f6ce1aeb200> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
WARNING:tensorflow:6 out of the last 6 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f6cfc1e5560> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
Epoch 4/5
20/20 - 0s - loss: 0.2395 - 32ms/epoch - 2ms/step
2021-07-22 01:22:35.986756: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
2021-07-22 01:22:36.059412: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
Epoch 5/5
20/20 - 0s - loss: 0.1527 - 32ms/epoch - 2ms/step
2021-07-22 01:22:36.403661: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
2021-07-22 01:22:36.475197: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
2021-07-22 01:22:36.818981: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
2021-07-22 01:22:36.891188: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
<keras.callbacks.History at 0x7f6e7801fc50>

直接の使用ClusterCoordinator (オプション)

あなたが選択した場合でもModel.fitトレーニングパスを、必要に応じてインスタンス化できtf.distribute.experimental.coordinator.ClusterCoordinatorあなたが労働者に実行されることを希望他の機能をスケジュールするオブジェクトを。参照カスタムトレーニングループとトレーニングの詳細および例については、セクションを。

カスタムトレーニングループを使用したトレーニング

でカスタムトレーニングループを使用tf.distribute.Strategyトレーニングループを定義するための大きな柔軟性を提供します。 ParameterServerStrategy (上記のように定義strategy )、あなたが使用するtf.distribute.experimental.coordinator.ClusterCoordinatorリモートワーカーへのトレーニングステップの実行を派遣します。

その後、あなたは、モデルを作成しますあなたが他とトレーニングループで行ったように、データセットとステップ関数を定義tf.distribute.Strategy秒。あなたは、より詳細な情報を確認することができtf.distribute.Strategyを持つカスタムトレーニングチュートリアル。

プリフェッチ効率的なデータセットを確保するために、推奨される分散型データセットの作成のAPIを使用して言及したリモートワーカーのに派遣訓練手順以下のセクション。また、呼び出していることを確認してくださいStrategy.run内側にworker_fn労働者に割り当てられたのGPUをフルに活用します。残りの手順は、GPUを使用する場合と使用しない場合のトレーニングで同じです。

次の手順でこれらのコンポーネントを作成しましょう。

データを設定する

まず、によって実施される論理前処理を含むデータセットを作成する関数書き込みKeras前処理層

あなたは外でこれらの層を作成しますdataset_fnが、内部で変換を適用dataset_fnあなたがラップするため、 dataset_fntf.function変数がその中に作成することはできません。

feature_vocab = [
    "avenger", "ironman", "batman", "hulk", "spiderman", "kingkong", "wonder_woman"
]
label_vocab = ["yes", "no"]

with strategy.scope():
  feature_lookup_layer = preprocessing.StringLookup(
      vocabulary=feature_vocab,
      mask_token=None)
  label_lookup_layer = preprocessing.StringLookup(
      vocabulary=label_vocab,
      num_oov_indices=0,
      mask_token=None)

  raw_feature_input = tf.keras.layers.Input(
      shape=(3,),
      dtype=tf.string,
      name="feature")
  feature_id_input = feature_lookup_layer(raw_feature_input)
  feature_preprocess_stage = tf.keras.Model(
      {"features": raw_feature_input},
      feature_id_input)

  raw_label_input = tf.keras.layers.Input(
      shape=(1,),
      dtype=tf.string,
      name="label")
  label_id_input = label_lookup_layer(raw_label_input)

  label_preprocess_stage = tf.keras.Model(
      {"label": raw_label_input},
      label_id_input)

データセットにおもちゃの例を生成します。

def feature_and_label_gen(num_examples=200):
  examples = {"features": [], "label": []}
  for _ in range(num_examples):
    features = random.sample(feature_vocab, 3)
    label = ["yes"] if "avenger" in features else ["no"]
    examples["features"].append(features)
    examples["label"].append(label)
  return examples

examples = feature_and_label_gen()

その後、に包まれたトレーニングデータセットを作成dataset_fn

def dataset_fn(_):
  raw_dataset = tf.data.Dataset.from_tensor_slices(examples)

  train_dataset = raw_dataset.map(
      lambda x: (
          {"features": feature_preprocess_stage(x["features"])},
          label_preprocess_stage(x["label"])
      )).shuffle(200).batch(32).repeat()
  return train_dataset

モデルを構築する

次に、モデルとその他のオブジェクトを作成します。下のすべての変数を作成することを確認しますstrategy.scope

# These variables created under the `strategy.scope` will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
  # Create the model. The input needs to be compatible with Keras processing layers.
  model_input = tf.keras.layers.Input(
      shape=(3,), dtype=tf.int64, name="model_input")

  emb_layer = tf.keras.layers.Embedding(
      input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=16384)
  emb_output = tf.reduce_mean(emb_layer(model_input), axis=1)
  dense_output = tf.keras.layers.Dense(units=1, activation="sigmoid")(emb_output)
  model = tf.keras.Model({"features": model_input}, dense_output)

  optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.1)
  accuracy = tf.keras.metrics.Accuracy()

使用することを確認してみましょうFixedShardsPartitioner 2つの破片にすべての変数を分割し、各シャードは異なるパラメータのサーバーに割り当てられました

assert len(emb_layer.weights) == 2
assert emb_layer.weights[0].shape == (4, 16384)
assert emb_layer.weights[1].shape == (4, 16384)
assert emb_layer.weights[0].device == "/job:ps/replica:0/task:0/device:CPU:0"
assert emb_layer.weights[1].device == "/job:ps/replica:0/task:1/device:CPU:0"

トレーニングステップを定義する

第三に、に包まれたトレーニングステップ作成tf.function

@tf.function
def step_fn(iterator):

  def replica_fn(batch_data, labels):
    with tf.GradientTape() as tape:
      pred = model(batch_data, training=True)
      per_example_loss = tf.keras.losses.BinaryCrossentropy(
              reduction=tf.keras.losses.Reduction.NONE)(labels, pred)
      loss = tf.nn.compute_average_loss(per_example_loss)
      gradients = tape.gradient(loss, model.trainable_variables)

    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    accuracy.update_state(labels, actual_pred)
    return loss

  batch_data, labels = next(iterator)
  losses = strategy.run(replica_fn, args=(batch_data, labels))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)

上記のトレーニングステップ関数では、呼び出しStrategy.runStrategy.reduce中でstep_fn 、作業者ごとに複数のGPUをサポートすることができます。労働者はGPUが割り当てられている場合は、 Strategy.run複数のレプリカ上のデータセットを配布します。

リモートワーカーへのトレーニング手順のディスパッチ

すべての計算がで定義された後ParameterServerStrategy 、あなたが使用するtf.distribute.experimental.coordinator.ClusterCoordinatorリソースを作成し、リモートワーカーへのトレーニングの手順を配布するクラスを。

まずは作成してみましょうClusterCoordinatorオブジェクトをと戦略オブジェクトに渡します。

coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)

次に、ワーカーごとのデータセットとイテレーターを作成します。でper_worker_dataset_fn以下、ラップdataset_fnstrategy.distribute_datasets_from_functionシームレスのGPUへの効率的なプリフェッチを許可することをお勧めします。

@tf.function
def per_worker_dataset_fn():
  return strategy.distribute_datasets_from_function(dataset_fn)

per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).

最後のステップは、使用してリモートワーカーに計算を分配することであるClusterCoordinator.schedule

  • schedule方法は、キューに入れtf.functionし、将来のような返しRemoteValueすぐに。キューに入れられた機能は、バックグラウンドスレッドでリモートワーカーに派遣され、 RemoteValue非同期に入力されます。
  • join方法( ClusterCoordinator.join )すべてのスケジュール機能が実行されるまで待つために使用することができます。
num_epoches = 4
steps_per_epoch = 5
for i in range(num_epoches):
  accuracy.reset_states()
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  # Wait at epoch boundaries.
  coordinator.join()
  print ("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
Finished epoch 0, accuracy is 0.668750.
Finished epoch 1, accuracy is 0.450000.
Finished epoch 2, accuracy is 1.000000.
Finished epoch 3, accuracy is 1.000000.

ここでは、結果フェッチできる方法ですRemoteValue

loss = coordinator.schedule(step_fn, args=(per_worker_iterator,))
print ("Final loss is %f" % loss.fetch())
Final loss is 0.000000

または、すべてのステップを起動して、完了を待っている間に何かを実行することもできます。

for _ in range(total_steps):
  coordinator.schedule(step_fn, args=(per_worker_iterator,))
while not coordinator.done():
  time.sleep(10)
  # Do something like logging metrics or writing checkpoints.

この特定の例の完全なトレーニングとサービス提供のワークフローでは、このチェックしてみてくださいテストを

データセット作成の詳細

上記のコードでデータセットを使用して作成されClusterCoordinator.create_per_worker_dataset )APIを。ワーカーごとに1つのデータセットを作成し、コンテナーオブジェクトを返します。あなたは呼び出すことができますiterあたりの労働者イテレータを作成するために方法を。単位の作業者は、反復子は、作業者ごとに反復子を含んでおり、作業者の対応するスライスを、関数に渡されたの入力引数で置換されClusterCoordinator.schedule機能が特定のワーカー上で実行される前に、方法。

現在、 ClusterCoordinator.schedule方法は、労働者が同等であり、したがって、別の労働者にデータセットを前提として想定して、彼らが含まれている場合、彼らは異なってシャッフルすることができる以外は同じですDataset.shuffle操作を。このため、また、データセットが無限に繰り返されるようにすることをお勧めします、あなたが代わりに頼るの有限数のステップスケジュールOutOfRangeErrorデータセットからを。

もう一つの重要な注意事項は、ということですtf.dataデータセットは、タスクの境界を越えて、暗黙的なシリアライズとデシリアライズをサポートしていません。だから、に渡された関数の内部でデータセット全体を作成することが重要であるClusterCoordinator.create_per_worker_dataset

評価

分散トレーニングで評価ループを定義して実行する方法は複数あります。以下に説明するように、それぞれに独自の長所と短所があります。好みがない場合は、インライン評価方法をお勧めします。

インライン評価

この方法では、訓練および評価、従ってそれと呼ばれるインライン評価との間のコーディネータ交互。

インライン評価にはいくつかの利点があります。例えば:

  • 単一のタスクでは保持できない大規模な評価モデルと評価データセットをサポートできます。
  • 評価結果は、次のエポックをトレーニングするための決定を行うために使用できます。

インライン評価を実装するには、直接評価と分散評価の2つの方法があります。

  • 直接評価:小型モデルと評価データセットの場合、コーディネーターは、コーディネーターの評価データセットを持つ分散モデル上で直接評価を実行することができます。
eval_dataset = tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).batch(8)

eval_accuracy = tf.keras.metrics.Accuracy()

for batch_data, labels in eval_dataset:
  pred = model(batch_data, training=False)
  actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
  eval_accuracy.update_state(labels, actual_pred)

print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000
  • 分散評価:コーディネーター上で直接実行するために実行不可能です大規模なモデルやデータセットの場合は、コーディネーターのタスクが経由して労働者に評価タスクを配布することができClusterCoordinator.schedule / ClusterCoordinator.join方法:
with strategy.scope():
  # Define the eval metric on parameter servers.
  eval_accuracy = tf.keras.metrics.Accuracy()

@tf.function
def eval_step(iterator):
  def replica_fn(batch_data, labels):
    pred = model(batch_data, training=False)
    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    eval_accuracy.update_state(labels, actual_pred)
  batch_data, labels = next(iterator)
  strategy.run(replica_fn, args=(batch_data, labels))

def eval_dataset_fn():
  return tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).shuffle(16).repeat().batch(8)

per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn)
per_worker_eval_iterator = iter(per_worker_eval_dataset)

eval_steps_per_epoch = 2
for _ in range(eval_steps_per_epoch):
  coordinator.schedule(eval_step, args=(per_worker_eval_iterator,))
coordinator.join()
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000

サイドカー評価

もう一つの方法は、あなたが繰り返しチェックポイントを読み取り、最新のチェックポイントで評価を実行する専用の評価タスクを作成する側車の評価と呼ばれています。評価結果に基づいてトレーニングループを変更する必要がない場合は、トレーニングプログラムを早期に終了できます。ただし、評価をトリガーするには、追加の評価タスクと定期的なチェックポイントが必要です。以下は、可能なサイドカー評価ループです。

checkpoint_dir = ...
eval_model = ...
eval_data = ...
checkpoint = tf.train.Checkpoint(model=eval_model)

for latest_checkpoint in tf.train.checkpoints_iterator(
    checkpoint_dir):
  try:
    checkpoint.restore(latest_checkpoint).expect_partial()
  except (tf.errors.OpError,) as e:
    # checkpoint may be deleted by training when it is about to read it.
    continue

  # Optionally add callbacks to write summaries.
  eval_model.evaluate(eval_data)

  # Evaluation finishes when it has evaluated the last epoch.
  if latest_checkpoint.endswith('-{}'.format(train_epoches)):
    break

実世界のクラスター

実際の実稼働環境では、すべてのタスクをさまざまなマシンのさまざまなプロセスで実行します。各タスクの[Configureクラスタ情報への最も簡単な方法は、設定することで"TF_CONFIG"環境変数を使用してtf.distribute.cluster_resolver.TFConfigClusterResolver解析する"TF_CONFIG"

一般的な説明については"TF_CONFIG"環境変数を参照してください分散トレーニングガイド。

あなたがKubernetesまたは他のコンフィギュレーションテンプレートを使用して、トレーニングタスクを起動した場合、それは、これらのテンプレートが既に設定されている可能性が非常に高い“TF_CONFIG"あなたのため。

設定する"TF_CONFIG"環境変数を

あなたは3人の労働者と2台のパラメータサーバーがあるとし、 "TF_CONFIG"労働者1のは、ことができます:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"],
        "chief": ["host6:port"]
    },
    "task": {"type": "worker", "index": 1}
})

"TF_CONFIG"評価のは、ことができます:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "evaluator": ["host7:port"]
    },
    "task": {"type": "evaluator", "index": 0}
})

"cluster"上で一部"TF_CONFIG"評価の文字列は任意です。

すべてのタスクに同じバイナリを使用する場合

単一のバイナリを使用してこれらすべてのタスクを実行する場合は、最初にプログラムをさまざまな役割に分岐させる必要があります。

cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if cluster_resolver.task_type in ("worker", "ps"):
  # Start a TensorFlow server and wait.
elif cluster_resolver.task_type == "evaluator":
  # Run side-car evaluation
else:
  # Run the coordinator.

次のコードはTensorFlowサーバーを起動し、待機します。

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

server = tf.distribute.Server(
    cluster_resolver.cluster_spec(),
    job_name=cluster_resolver.task_type,
    task_index=cluster_resolver.task_id,
    protocol=cluster_resolver.rpc_layer or "grpc",
    start=True)
server.join()

タスクの失敗の処理

労働者の失敗

tf.distribute.experimental.coordinator.ClusterCoordinatorまたはModel.fit内蔵の労働者の障害のためにフォールトトレランスを提供します。労働者の回復時には、以前に提供されたデータセットの機能(のいずれかにClusterCoordinator.create_per_worker_datasetカスタムトレーニングforループ、またはtf.keras.utils.experimental.DatasetCreatorModel.fit )再作成してデータセットに労働者に呼び出されます。

パラメータサーバーまたはコーディネータの障害

しかし、コーディネータがパラメータサーバのエラーを見たとき、それが発生しますUnavailableErrorまたはAbortedErrorすぐに。この場合、コーディネーターを再起動できます。コーディネーター自体も利用できなくなる可能性があります。したがって、トレーニングの進行状況を失わないようにするために、特定のツールをお勧めします。

  • Model.fit 、あなたが使用する必要がありますBackupAndRestore自動進行の節約と復元を処理するコールバックを。参照してください。コールバックとトレーニング例えば上記のセクションを。

  • カスタムトレーニングループの場合、トレーニングを開始する前に、モデル変数を定期的にチェックポイントし、チェックポイントがある場合はチェックポイントからモデル変数をロードする必要があります。トレーニングの進捗状況は以下から約推測できるoptimizer.iterationsオプティマイザはチェックポイントされている場合:

checkpoint_manager = tf.train.CheckpointManager(
    tf.train.Checkpoint(model=model, optimizer=optimizer),
    checkpoint_dir,
    max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
  checkpoint = checkpoint_manager.checkpoint
  checkpoint.restore(
      checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()

global_steps = int(optimizer.iterations.numpy())
starting_epoch = global_steps // steps_per_epoch

for _ in range(starting_epoch, num_epoches):
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  coordinator.join()
  checkpoint_manager.save()

フェッチRemoteValue

フェッチRemoteValue機能が正常に実行された場合に成功することが保証されています。これは、現在、関数の実行後すぐに戻り値がコーディネーターにコピーされるためです。コピー中にワーカー障害が発生した場合、関数は別の使用可能なワーカーで再試行されます。したがって、パフォーマンスを最適化する場合は、戻り値なしで関数をスケジュールできます。

エラー報告

コーディネーターは、このようなエラーが見たらUnavailableErrorなどのパラメータサーバーや他のアプリケーションエラーからのInvalidArgumentからtf.debugging.check_numerics 、それはエラーを上げる前に、すべての保留とキューに入れられた機能をキャンセルします。それらに対応するフェッチRemoteValueのことは高くなりますCancelledError

エラーが発生した後、コーディネーターは同じエラーまたはキャンセルされた関数からのエラーを発生させません。

パフォーマンスの向上

あなたはパフォーマンスの問題を見れば、あなたがして訓練するいくつかの理由がありますParameterServerStrategyClusterResolver

一般的な理由の1つは、パラメーターサーバーの負荷が不均衡であり、一部の高負荷のパラメーターサーバーが容量に達していることです。複数の根本原因も考えられます。この問題を軽減するためのいくつかの簡単な方法は次のとおりです。

  1. 指定経由で大規模なモデル変数をシャードvariable_partitioner構築する際にParameterServerStrategy
  2. 可能であれば、すべてのパラメーターサーバーで必要なホットスポット変数を1つのステップで作成することは避けてください。例えば、一定の学習率やサブクラスを使用tf.keras.optimizers.schedules.LearningRateScheduleデフォルトの動作は、学習率は、特定のパラメータサーバ上に置かれ、変数になり、各ステップで他のすべてのパラメータのサーバから要求されたということですので、オプティマイザで。
  3. 大きな語彙をシャッフルしてから、Keras前処理レイヤーに渡します。

パフォーマンスの問題のもう1つの考えられる理由は、コーディネーターです。あなたの最初の実装schedule / join Pythonベースであるため、オーバーヘッドスレッディングている可能性があります。また、コーディネーターとワーカーの間の待ち時間が長くなる可能性があります。このような場合は、

  • Model.fit 、あなたが設定することができsteps_per_executionに設けた引数Model.compile 1よりも大きな値にします。

  • カスタムトレーニングループのために、あなたは、単一の中に複数のステップをパックすることができますtf.function

steps_per_invocation = 10

@tf.function
def step_fn(iterator):
  for _ in range(steps_per_invocation):
    features, labels = next(iterator)
    def replica_fn(features, labels):
      ...

    strategy.run(replica_fn, args=(features, labels))

ライブラリがさらに最適化されているため、将来、ほとんどのユーザーが手動でステップをパックする必要がなくなることを願っています。

さらに、パフォーマンスを向上させるための小さなトリックは、上記のタスク障害の処理のセクションで説明したように、戻り値なしで関数をスケジュールすることです。

既知の制限

既知の制限のほとんどは、上記のセクションですでに説明されています。このセクションでは、要約を提供します。

ParameterServerStrategy一般

  • os.environment["grpc_fail_fast"]="use_caller"適切にフォールトトレランスを機能させるために、コーディネーターを含むすべてのタスクに必要とされています。
  • 同期パラメータサーバートレーニングはサポートされていません。
  • 通常、最適なパフォーマンスを実現するには、複数のステップを1つの関数にまとめる必要があります。
  • 経由saved_modelをロードするためにサポートされていませんtf.saved_model.loadシャード変数を含みます。 TensorFlowServingを使用してこのようなsaved_modelをロードすることは機能することが期待されていることに注意してください。
  • シャードオプティマイザースロット変数を含むチェックポイントを異なる数のシャードにロードすることはサポートされていません。
  • コーディネータータスクを再起動せずにパラメーターサーバーの障害から回復することはサポートされていません。
  • 使用tf.lookup.StaticHashTable (一般的に、いくつかによって使用されるtf.keras.layers.experimental.preprocessingような層、 IntegerLookupStringLookup 、及びTextVectorizationパラメータサーバトレーニングこの時、コーディネータ上に配置されたリソースでの結果)。これは、ワーカーからコーディネーターへのルックアップRPCのパフォーマンスに影響します。これは、現在対処すべき最優先事項です。

Model.fit詳細

  • steps_per_epoch引数が必要であるModel.fit 。エポックで適切な間隔を提供する値を選択できます。
  • ParameterServerStrategyパフォーマンス上の理由から、バッチレベルの呼び出しを持つカスタムコールバックをサポートしていません。あなたは適切に選んでエポックレベルの呼び出しにこれらの呼び出しを変換する必要がありsteps_per_epoch彼らはすべてのと呼ばれているように、 steps_per_epochステップの数。組み込みのコールバックは影響を受けません。バッチレベルの呼び出しは、パフォーマンスが向上するように変更されています。バッチレベルの呼び出しのサポートParameterServerStrategy計画されています。
  • 同じ理由で、他の戦略とは異なり、プログレスバーとメトリックはエポック境界でのみログに記録されます。
  • run_eagerlyサポートされていません。

カスタムトレーニングループの詳細