MLコミュニティデーは11月9日です! TensorFlow、JAXからの更新のために私たちに参加し、より多くの詳細をご覧ください

マルチワーカーCPU / GPUトレーニングを移行する

TensorFlow.orgで表示 GoogleColabで実行 GitHubでソースを表示 ノートブックをダウンロード

このガイドでは、マルチワーカー分散トレーニングワークフローをTensorFlow1からTensorFlow2に移行する方法を示します。

CPU / GPUを使用してマルチワーカートレーニングを実行するには:

設定

いくつかの必要なインポートと、デモンストレーション用の簡単なデータセットから始めます。

# Install tf-nightly as the notebook uses a dataset instance for `Model.fit`
# with `ParameterServerStrategy`, which depends on symbols in TF 2.7.
!pip uninstall -q -y tensorflow keras
!pip install -q tf-nightly
# Install a utility needed for this demonstration
!pip install portpicker

import tensorflow as tf
import tensorflow.compat.v1 as tf1
features = [[1., 1.5], [2., 2.5], [3., 3.5]]
labels = [[0.3], [0.5], [0.7]]
eval_features = [[4., 4.5], [5., 5.5], [6., 6.5]]
eval_labels = [[0.8], [0.9], [1.]]

あなたは必要になります'TF_CONFIG' TensorFlowで複数のマシン上でのトレーニングのための設定環境変数を。使用'TF_CONFIG'を指定する'cluster''task' s'のアドレスを。 (で詳細情報Distributed_trainingのガイド。)

import json
import os

tf_config = {
    'cluster': {
        'chief': ['localhost:11111'],
        'worker': ['localhost:12345', 'localhost:23456', 'localhost:21212'],
        'ps': ['localhost:12121', 'localhost:13131'],
    },
    'task': {'type': 'chief', 'index': 0}
}

os.environ['TF_CONFIG'] = json.dumps(tf_config)

使用del変数を削除する文を(しかしTensorFlow 1における実世界のマルチ労働者の訓練では、あなたがこれを行う必要はありません):

del os.environ['TF_CONFIG']

TensorFlow 1:tf.estimatorAPIを使用したマルチワーカー分散トレーニング

次のコードスニペットは、TF1におけるマルチ労働者の研修の標準的なワークフローを示しています。あなたが使用するtf.estimator.Estimatortf.estimator.TrainSpectf.estimator.EvalSpec 、およびtf.estimator.train_and_evaluate配布するAPIをトレーニング:

def _input_fn():
  return tf1.data.Dataset.from_tensor_slices((features, labels)).batch(1)

def _eval_input_fn():
  return tf1.data.Dataset.from_tensor_slices(
      (eval_features, eval_labels)).batch(1)

def _model_fn(features, labels, mode):
  logits = tf1.layers.Dense(1)(features)
  loss = tf1.losses.mean_squared_error(labels=labels, predictions=logits)
  optimizer = tf1.train.AdagradOptimizer(0.05)
  train_op = optimizer.minimize(loss, global_step=tf1.train.get_global_step())
  return tf1.estimator.EstimatorSpec(mode, loss=loss, train_op=train_op)

estimator = tf1.estimator.Estimator(model_fn=_model_fn)
train_spec = tf1.estimator.TrainSpec(input_fn=_input_fn)
eval_spec = tf1.estimator.EvalSpec(input_fn=_eval_input_fn)
tf1.estimator.train_and_evaluate(estimator, train_spec, eval_spec)
INFO:tensorflow:Using default config.
WARNING:tensorflow:Using temporary folder as model directory: /tmp/tmpu5zsvkn2
INFO:tensorflow:Using config: {'_model_dir': '/tmp/tmpu5zsvkn2', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': None, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_checkpoint_save_graph_def': True, '_service': None, '_cluster_spec': ClusterSpec({}), '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1}
INFO:tensorflow:Not using Distribute Coordinator.
INFO:tensorflow:Running training and evaluation locally (non-distributed).
INFO:tensorflow:Start train and evaluate loop. The evaluate will happen after every checkpoint. Checkpoint frequency is determined based on RunConfig arguments: save_checkpoints_steps None or save_checkpoints_secs 600.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/training/training_util.py:401: Variable.initialized_value (from tensorflow.python.ops.variables) is deprecated and will be removed in a future version.
Instructions for updating:
Use Variable.read_value. Variables in 2.X are initialized automatically both in eager and graph (inside tf.defun) contexts.
INFO:tensorflow:Calling model_fn.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/training/adagrad.py:143: calling Constant.__init__ (from tensorflow.python.ops.init_ops) with dtype is deprecated and will be removed in a future version.
Instructions for updating:
Call initializer instance with the dtype argument instead of passing it to the constructor
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Create CheckpointSaverHook.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...
INFO:tensorflow:Saving checkpoints for 0 into /tmp/tmpu5zsvkn2/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...
INFO:tensorflow:loss = 0.0176871, step = 0
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 3...
INFO:tensorflow:Saving checkpoints for 3 into /tmp/tmpu5zsvkn2/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 3...
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Starting evaluation at 2021-09-22T20:01:54
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Restoring parameters from /tmp/tmpu5zsvkn2/model.ckpt-3
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Inference Time : 0.16660s
INFO:tensorflow:Finished evaluation at 2021-09-22-20:01:55
INFO:tensorflow:Saving dict for global step 3: global_step = 3, loss = 0.0040814565
INFO:tensorflow:Saving 'checkpoint_path' summary for global step 3: /tmp/tmpu5zsvkn2/model.ckpt-3
INFO:tensorflow:Loss for final step: 0.034454126.
({'loss': 0.0040814565, 'global_step': 3}, [])

TensorFlow 2:配布戦略を使用したマルチワーカートレーニング

TensorFlow 2では、CPUやGPUを用いて複数の労働者を横切ってトレーニングを配布し、そしてTPUは介して行われtf.distribute.Strategy S。

次の例では、2つのそのような戦略を使用する方法を示してtf.distribute.experimental.ParameterServerStrategytf.distribute.MultiWorkerMirroredStrategy 、複数の労働者とCPU / GPUの訓練のために設計されているどちらもを。

ParameterServerStrategyコーディネーター(採用'chief'このコラボノートブックにおける環境とそれをより親しみやすいなります)。あなたは、ここで実行可能な経験のために不可欠な支持要素を設定するために、ここでいくつかのユーティリティを使用する:あなたは、スレッドは、パラメータ・サーバ(シミュレートするために使用されているプロセス内のクラスタ、作成されます'ps' )と労働者を( 'worker' ) 。パラメータサーバのトレーニングの詳細については、を参照してくださいParameterServerStrategyの持つパラメータサーバのトレーニングチュートリアル。

この例では、最初の定義'TF_CONFIG'と環境変数をtf.distribute.cluster_resolver.TFConfigClusterResolverクラスタ情報を提供します。あなたは、分散訓練、チェックそれが提供している場合のために、クラスタ管理システムを使用している場合は'TF_CONFIG'明示的にこの環境変数を設定する必要はありません。その場合には、すでにあなたのため、。 (設定で詳細情報'TF_CONFIG'での環境変数のセクションTensorFlowを持つ分散型トレーニングガイド。)

# Find ports that are available for the `'chief'` (the coordinator),
# `'worker'`s, and `'ps'` (parameter servers).
import portpicker

chief_port = portpicker.pick_unused_port()
worker_ports = [portpicker.pick_unused_port() for _ in range(3)]
ps_ports = [portpicker.pick_unused_port() for _ in range(2)]

# Dump the cluster information to `'TF_CONFIG'`.
tf_config = {
    'cluster': {
        'chief': ["localhost:%s" % chief_port],
        'worker': ["localhost:%s" % port for port in worker_ports],
        'ps':  ["localhost:%s" % port for port in ps_ports],
    },
    'task': {'type': 'chief', 'index': 0}
}
os.environ['TF_CONFIG'] = json.dumps(tf_config)

# Use a cluster resolver to bridge the information to the strategy created below.
cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()

次に、作成tf.distribute.Server労働者とパラメータサーバ1対1のための秒:

# Workers need some inter_ops threads to work properly.
# This is only needed for this notebook to demo. Real servers
# should not need this.
worker_config = tf.compat.v1.ConfigProto()
worker_config.inter_op_parallelism_threads = 4

for i in range(3):
  tf.distribute.Server(
      cluster_resolver.cluster_spec(),
      job_name="worker",
      task_index=i,
      config=worker_config)

for i in range(2):
  tf.distribute.Server(
      cluster_resolver.cluster_spec(),
      job_name="ps",
      task_index=i)

現実世界では代わりに、すべての出発の、訓練を分散tf.distribute.Serverのコーディネーターに、あなたが使用する複数のマシンを、として指定されているものを"worker"のおよび"ps"の各ます(パラメータサーバ)実行tf.distribute.Server 。で、実世界のセクションでクラスタを参照してください。パラメータサーバーのトレーニングの詳細についてのチュートリアル。

すべての準備が整っして、作成ParameterServerStrategyオブジェクトを:

strategy = tf.distribute.experimental.ParameterServerStrategy(cluster_resolver)
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'chief': ['localhost:18600'], 'ps': ['localhost:18336', 'localhost:15625'], 'worker': ['localhost:16022', 'localhost:17375', 'localhost:18365']})
INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'chief': ['localhost:18600'], 'ps': ['localhost:18336', 'localhost:15625'], 'worker': ['localhost:16022', 'localhost:17375', 'localhost:18365']})
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:chief/replica:0/task:0/device:GPU:0'], variable_device = '/job:chief/replica:0/task:0/device:GPU:0'
INFO:tensorflow:Number of GPUs on workers: 1

あなたは戦略オブジェクトを作成したら、モデル、オプティマイザ、および他の変数を定義し、Keras呼び出すModel.compileStrategy.scopeトレーニングを配布するためのAPI。 (参照してくださいStrategy.scope詳細については、APIドキュメント。)

あなたは、前方と後方のパスを定義し、たとえば、であなたのトレーニングをカスタマイズしたい場合は、カスタムトレーニングループでトレーニングを参照してください。パラメータサーバのトレーニングの詳細についてのチュートリアル。

dataset = tf.data.Dataset.from_tensor_slices(
      (features, labels)).shuffle(10).repeat().batch(64)

eval_dataset = tf.data.Dataset.from_tensor_slices(
      (eval_features, eval_labels)).repeat().batch(1)

with strategy.scope():
  model = tf.keras.models.Sequential([tf.keras.layers.Dense(1)])
  optimizer = tf.keras.optimizers.Adagrad(learning_rate=0.05)
  model.compile(optimizer, "mse")

model.fit(dataset, epochs=5, steps_per_epoch=10)
Epoch 1/5
/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/data/ops/dataset_ops.py:453: UserWarning: To make it possible to preserve tf.data options across serialization boundaries, their implementation has moved to be part of the TensorFlow graph. As a consequence, the options value is in general no longer known at graph construction time. Invoking this method in graph mode retains the legacy behavior of the original implementation, but note that the returned value might not reflect the actual value of the options.
  warnings.warn("To make it possible to preserve tf.data options across "
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
2021-09-22 20:01:56.897008: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:765] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 3
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:4"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 2
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}

2021-09-22 20:01:56.899102: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:765] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 3
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:4"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 2
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}

2021-09-22 20:01:56.914645: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:765] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 3
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:4"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 2
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
10/10 - 2s - loss: 0.1728 - 2s/epoch - 176ms/step
Epoch 2/5
10/10 - 0s - loss: 0.0102 - 73ms/epoch - 7ms/step
Epoch 3/5
10/10 - 0s - loss: 0.0091 - 71ms/epoch - 7ms/step
Epoch 4/5
10/10 - 0s - loss: 0.0083 - 71ms/epoch - 7ms/step
Epoch 5/5
10/10 - 0s - loss: 0.0077 - 73ms/epoch - 7ms/step
<keras.callbacks.History at 0x7fec502068d0>
model.evaluate(eval_dataset, steps=10, return_dict=True)
1/10 [==>...........................] - ETA: 12s - loss: 0.0761
2021-09-22 20:01:58.817232: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:765] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 3
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:8"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 2
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}

2021-09-22 20:01:58.997047: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:765] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 3
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:8"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 2
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
8/10 [=======================>......] - ETA: 0s - loss: 0.2278
2021-09-22 20:01:59.177706: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:765] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 3
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:8"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 2
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
10/10 [==============================] - 2s 50ms/step - loss: 0.2356
{'loss': 0.23556283}

パーティショナ( tf.distribute.experimental.partitioners

ParameterServerStrategy TensorFlowで2つのサポート少ない混乱名前のTensorFlow 1、などの可変分割と申し出同じパーティショナ: - tf.compat.v1.variable_axis_size_partitioner - > tf.distribute.experimental.partitioners.MaxSizePartitioner :最大サイズ下の破片を保持パーティ) 。 - tf.compat.v1.min_max_variable_partitioner - > tf.distribute.experimental.partitioners.MinSizePartitioner :シャードごとの最小サイズを割り当てパーティ。 - tf.compat.v1.fixed_size_partitioner - > tf.distribute.experimental.partitioners.FixedShardsPartitioner :破片の固定数を割り当てパーティ。

別の方法としては、使用することができますMultiWorkerMirroredStrategyオブジェクトを:

# To clean up the `TF_CONFIG` used for `ParameterServerStrategy`.
del os.environ['TF_CONFIG']
strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CommunicationImplementation.AUTO

あなたはして上記で使用される戦略置き換えることができMultiWorkerMirroredStrategyこの戦略でトレーニングを行うためにオブジェクトを。

同様にtf.estimatorため、APIは、 MultiWorkerMirroredStrategyマルチクライアント戦略で、このコラボノートブックに分布してトレーニングを実行するための簡単な方法はありません。したがって、上記のコードをこの戦略に置き換えると、ローカルで実行されることになります。マルチワーカー養成Keras Model.fitと/カスタムトレーニングループチュートリアルでマルチ労働者の訓練を実行する方法を示し'TF_CONFIG'コラボのlocalhost上の2人の労働者で、設定変数。実際には、外部のIPアドレス/ポート上で複数の労働者を作成し、使用する'TF_CONFIG'各ワーカーのためのクラスタ構成を指定する変数を。

次のステップ

マルチ労働者の分散訓練についての詳細を学ぶためにtf.distribute.experimental.ParameterServerStrategytf.distribute.MultiWorkerMirroredStrategy TensorFlow 2で、次のリソースを考慮してください。