Trả lời cho sự kiện TensorFlow Everywhere tại địa phương của bạn ngay hôm nay!
Trang này được dịch bởi Cloud Translation API.
Switch to English

Đào tạo máy chủ tham số

Xem trên TensorFlow.org Chạy trong Google Colab Xem nguồn trên GitHub Tải xuống sổ tay

Tổng quat

Đào tạo máy chủ tham số là một phương pháp song song dữ liệu phổ biến để mở rộng quy mô đào tạo mô hình trên nhiều máy. Một cụm đào tạo máy chủ tham số bao gồm công nhân và máy chủ tham số. Các biến được tạo trên máy chủ tham số và chúng được công nhân đọc và cập nhật trong mỗi bước. Theo mặc định, công nhân đọc và cập nhật các biến này một cách độc lập mà không cần đồng bộ hóa với nhau. Đây là lý do tại sao đôi khi huấn luyện kiểu máy chủ tham số được gọi là huấn luyện không đồng bộ.

Đào tạo máy chủ tham số TensorFlow 2 sử dụng bộ điều phối trung tâm thông qua lớp tf.distribute.experimental.coordinator.ClusterCoordinator .

Trong việc triển khai này, parameter server tác vụ parameter serverworker chạy tf.distribute.Server lắng nghe các yêu cầu từ điều phối viên. Người điều phối tạo tài nguyên, gửi các nhiệm vụ đào tạo, viết các điểm kiểm tra và xử lý các lỗi nhiệm vụ.

Chúng tôi tin rằng kiến ​​trúc này và lớp ClusterCoordinator mới cung cấp một mô hình lập trình linh hoạt hơn và đơn giản hơn.

ClusterCoordinator

Lớp ClusterCoordinator cần hoạt động cùng với đối tượng tf.distribute.Strategy . Đối tượng tf.distribute.Strategy này là cần thiết để chuyển thông tin của cụm và được sử dụng để xác định một bước đào tạo như chúng ta đã thấy trong đào tạo tùy chỉnh với MirroredStrategy . Sau đó, đối tượng ClusterCoordinator gửi việc thực thi các bước đào tạo này tới những người làm việc từ xa. Hiện tại, ClusterCoordinator chỉ hoạt động với tf.distribute.experimental.ParameterServerStrategy .

API quan trọng nhất được cung cấp bởi đối tượng ClusterCoordinatorschedule . API schedule hàng một tf.function và trả về một RemoteValue tương tự trong tương lai ngay lập tức. Các chức năng được xếp hàng đợi sẽ được gửi đến nhân viên từ xa trong các luồng nền và các RemoteValue của họ sẽ được điền không đồng bộ. Vì schedule không yêu cầu chỉ định công nhân, nên chức năng tf.function được chuyển vào có thể được thực thi trên bất kỳ công nhân nào có sẵn. Nếu worker mà nó được thực thi không khả dụng trước khi hoàn thành, thì hàm sẽ được thử lại trên một worker có sẵn khác. Vì thực tế này và thực tế là việc thực thi hàm không phải là nguyên tử, một hàm có thể được thực thi nhiều lần.

Ngoài việc điều phối các chức năng từ xa, ClusterCoordinator cũng giúp tạo các tập dữ liệu về tất cả các nhân viên và xây dựng lại các tập dữ liệu này khi một nhân viên phục hồi sau lỗi.

Hướng dẫn thiết lập

pip install -q portpicker
import multiprocessing
import os
import random
import portpicker
import tensorflow as tf
import tensorflow.keras as keras
import tensorflow.keras.layers.experimental.preprocessing as kpl

Thiết lập cụm

Như đã đề cập ở trên, cụm đào tạo máy chủ tham số yêu cầu một tác vụ điều phối viên chạy chương trình đào tạo của bạn, một hoặc một số tác nhân và tác vụ máy chủ tham số chạy máy chủ TensorFlow, tức là tf.distribute.Server và có thể là một tác vụ đánh giá bổ sung chạy side-car đánh giá (xem phần đánh giá xe bên dưới). Các yêu cầu để thiết lập chúng là:

  • Tác vụ điều phối viên cần biết địa chỉ và cổng của tất cả các máy chủ TensorFlow khác ngoại trừ trình đánh giá.
  • Các worker và các máy chủ tham số cần biết họ cần lắng nghe cổng nào. Để đơn giản, chúng tôi thường chuyển thông tin cụm đầy đủ khi chúng tôi tạo máy chủ TensorFlow trên các tác vụ này.
  • Người đánh giá nhiệm vụ không cần phải biết thiết lập của cụm đào tạo. Nếu có, nó không nên cố gắng kết nối với cụm đào tạo.
  • Công nhân và máy chủ tham số phải có các loại tác vụ tương ứng là “worker” và “ps”. Điều phối viên nên sử dụng "trưởng" làm loại nhiệm vụ vì những lý do kế thừa.

Trong hướng dẫn này, chúng tôi sẽ tạo một cụm trong quá trình để có thể chạy toàn bộ quá trình đào tạo máy chủ tham số trong colab. Chúng tôi sẽ giới thiệu cách thiết lập các cụm thực trong phần sau.

Cụm trong quá trình

Trong hướng dẫn này, chúng tôi sẽ khởi động trước một loạt các máy chủ TensorFlow và kết nối với chúng sau:

def create_in_process_cluster(num_workers, num_ps):
  """Creates and starts local servers and returns the cluster_resolver."""
  worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
  ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]

  cluster_dict = {}
  cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
  if num_ps > 0:
    cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]

  cluster_spec = tf.train.ClusterSpec(cluster_dict)

  # Workers need some inter_ops threads to work properly.
  worker_config = tf.compat.v1.ConfigProto()
  if multiprocessing.cpu_count() < num_workers + 1:
    worker_config.inter_op_parallelism_threads = num_workers + 1

  for i in range(num_workers):
    tf.distribute.Server(
        cluster_spec, job_name="worker", task_index=i, config=worker_config,
        protocol="grpc")

  for i in range(num_ps):
    tf.distribute.Server(
        cluster_spec, job_name="ps", task_index=i, protocol="grpc")

  cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
      cluster_spec, rpc_layer="grpc")
  return cluster_resolver

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)

Đào tạo với Vòng đào tạo Tùy chỉnh

Vòng lặp huấn luyện tùy chỉnh với tf.distribute.Strategy cung cấp tính linh hoạt cao để xác định các vòng huấn luyện. Hiện tại để đào tạo máy chủ tham số trong TensorFlow 2, chỉ hỗ trợ vòng lặp đào tạo tùy chỉnh. Ở đây chúng tôi sử dụng ParameterServerStrategy để xác định một bước đào tạo và sau đó sử dụng ClusterCoordinator để gửi việc thực hiện các bước đào tạo đến những người làm việc từ xa.

Tạo ParameterServerStrategy

Để viết một bước đào tạo trong vòng lặp đào tạo tùy chỉnh, bước đầu tiên là tạo một ParameterServerStrategy . Chúng tôi sẽ giải thích về variable_partitioner sau.

variable_partitioner = (
    tf.distribute.experimental.partitioners.FixedShardsPartitioner(
        num_shards=NUM_PS))

strategy = tf.distribute.experimental.ParameterServerStrategy(
    cluster_resolver,
    variable_partitioner=variable_partitioner)
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:localhost/replica:0/task:0/device:GPU:0'], variable_device = '/job:localhost/replica:0/task:0/device:GPU:0'
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'ps': ['localhost:20829', 'localhost:21744'], 'worker': ['localhost:15899', 'localhost:17275', 'localhost:17846']})
INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'ps': ['localhost:20829', 'localhost:21744'], 'worker': ['localhost:15899', 'localhost:17275', 'localhost:17846']})

Sau đó, bạn sẽ tạo một mô hình, xác định một tập dữ liệu và một hàm bước như chúng ta đã thấy trong vòng lặp đào tạo với các tf.distribute.Strategy s khác. Bạn có thể tìm thêm chi tiết trong hướng dẫn này. Hãy tạo các thành phần này theo các bước sau:

Thiết lập dữ liệu

Đầu tiên, hãy viết một hàm tạo tập dữ liệu bao gồm logic tiền xử lý được thực hiện bởi các lớp tiền xử lý của Keras. Chúng tôi sẽ tạo các lớp này bên ngoài dataset_fn nhưng áp dụng phép chuyển đổi bên trong dataset_fn vì bạn sẽ bọc dataset_fn thành một tf.function . không cho phép tạo các biến bên trong nó.

feature_vocab = [
    "avenger", "ironman", "batman", "hulk", "spiderman", "kingkong",
    "wonder_woman"
]
label_vocab = ["yes", "no"]

with strategy.scope():
  feature_lookup_layer = kpl.StringLookup(vocabulary=feature_vocab)

  label_lookup_layer = kpl.StringLookup(vocabulary=label_vocab,
                                        num_oov_indices=0,
                                        mask_token=None)

  raw_feature_input = keras.layers.Input(
      shape=(3,), dtype=tf.string, name="feature")
  feature_id_input = feature_lookup_layer(raw_feature_input)
  feature_preprocess_stage = keras.Model(
      {"features": raw_feature_input}, feature_id_input)

  raw_label_input = keras.layers.Input(
      shape=(1,), dtype=tf.string, name="label")
  label_id_input = label_lookup_layer(raw_label_input)
  label_preprocess_stage = keras.Model({"label": raw_label_input}, label_id_input)

Tạo các ví dụ về đồ chơi trong tập dữ liệu:

def feature_and_label_gen(num_examples=200):
  examples = {"features": [], "label": []}
  for _ in range(num_examples):
    features = random.sample(feature_vocab, 3)
    label = ["yes"] if "avenger" in features else ["no"]
    examples["features"].append(features)
    examples["label"].append(label)
  return examples

examples = feature_and_label_gen()

Sau đó, chúng tôi tạo tập dữ liệu đào tạo được bao bọc trong dataset_fn:

def dataset_fn(_):
  raw_dataset = tf.data.Dataset.from_tensor_slices(examples)

  train_dataset = raw_dataset.map(
      lambda x: (
          {"features": feature_preprocess_stage(x["features"])},
          label_preprocess_stage(x["label"])
      )).shuffle(200).batch(32).repeat()
  return train_dataset

Xây dựng mô hình

Thứ hai, chúng tôi tạo mô hình và các đối tượng khác. Đảm bảo tạo tất cả các biến trong strategy.scope .

# These variables created under the `strategy.scope` will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
  # Create the model. The input needs to be compatible with KPLs.
  model_input = keras.layers.Input(
      shape=(3,), dtype=tf.int64, name="model_input")

  emb_layer = keras.layers.Embedding(
      input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=20)
  emb_output = tf.reduce_mean(emb_layer(model_input), axis=1)
  dense_output = keras.layers.Dense(units=1, activation="sigmoid")(emb_output)
  model = keras.Model({"features": model_input}, dense_output)

  optimizer = keras.optimizers.RMSprop(learning_rate=0.1)
  accuracy = keras.metrics.Accuracy()

Xác định bước đào tạo

Thứ ba, tạo bước đào tạo được bao bọc thành một tf.function .:

@tf.function
def step_fn(iterator):

  def replica_fn(batch_data, labels):
    with tf.GradientTape() as tape:
      pred = model(batch_data, training=True)
      per_example_loss = keras.losses.BinaryCrossentropy(
              reduction=tf.keras.losses.Reduction.NONE)(labels, pred)
      loss = tf.nn.compute_average_loss(per_example_loss)
      gradients = tape.gradient(loss, model.trainable_variables)

    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    accuracy.update_state(labels, actual_pred)
    return loss

  batch_data, labels = next(iterator)
  losses = strategy.run(replica_fn, args=(batch_data, labels))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)

Trong hàm bước trên, gọi strategy.run step_fnstrategy.reduce step_fn trong step_fn rất hữu ích để hỗ trợ GPU hoặc nhiều nhân viên bản sao trong tương lai, mặc dù chúng có triển khai nhỏ tại thời điểm này.

Cử các bước đào tạo cho nhân viên từ xa

Sau khi tất cả các tính toán được xác định bởi ParameterServerStrategy , chúng tôi sẽ sử dụng lớp ClusterCoordinator để tạo tài nguyên và phân phối các bước đào tạo cho nhân viên từ xa.

Đầu tiên, hãy tạo một đối tượng ClusterCoordinator và chuyển đối tượng chiến lược vào:

coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)

Sau đó, chúng tôi tạo một tập dữ liệu cho mỗi công nhân và một trình lặp. Trong per_worker_dataset_fn dưới đây, gói các dataset_fn vào strategy.distribute_datasets_from_function là không bắt buộc nhưng nó sẽ cho phép hỗ trợ tìm nạp trước hiệu quả để GPU liên tục trong tương lai khi GPU được hỗ trợ bởi ParameterServerStrategy .

@tf.function
def per_worker_dataset_fn():
  return strategy.distribute_datasets_from_function(dataset_fn)

per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).

Bước cuối cùng là phân phối tính toán cho nhân viên từ xa sử dụng schedule . Phương thức schedule hàng một tf.function và trả về một RemoteValue giống trong tương lai ngay lập tức. Các chức năng được xếp hàng đợi sẽ được gửi đến nhân viên từ xa trong các luồng nền và RemoteValue sẽ được lấp đầy không đồng bộ. Phương thức join có thể được sử dụng để đợi cho đến khi tất cả các hàm đã lên lịch được thực hiện.

num_epoches = 4
steps_per_epoch = 5
for i in range(num_epoches):
  accuracy.reset_states()
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  # Wait at epoch boundaries.
  coordinator.join()
  print ("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:1/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:1/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:1/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:1/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',).
Finished epoch 0, accuracy is 0.462500.
Finished epoch 1, accuracy is 0.925000.
Finished epoch 2, accuracy is 1.000000.
Finished epoch 3, accuracy is 1.000000.

Đây là cách bạn có thể lấy kết quả của RemoteValue :

loss = coordinator.schedule(step_fn, args=(per_worker_iterator,))
print ("Final loss is %f" % loss.fetch())
Final loss is 0.015665

Ngoài ra, bạn có thể khởi chạy tất cả các bước và làm điều gì đó trong khi chờ hoàn thành:

for _ in range(total_steps):
  coordinator.schedule(step_fn, args=(per_worker_iterator,))
while not coordinator.done():
  time.sleep(10)
  # Do something like logging metrics or writing checkpoints.

Đối với công tác đào tạo đầy đủ và phục vụ công việc cho ví dụ cụ thể này, hãy kiểm tra này kiểm tra .

Thông tin thêm về tạo tập dữ liệu

Tập dữ liệu trong đoạn mã trên được tạo bằng API create_per_worker_dataset . Nó tạo một tập dữ liệu cho mỗi công nhân và trả về một đối tượng vùng chứa. Bạn có thể gọi phương thức iter trên nó để tạo một trình vòng lặp cho mỗi công nhân. Trình vòng lặp cho mỗi công nhân chứa một trình lặp cho mỗi công nhân và lát tương ứng của một công nhân sẽ được thay thế trong đối số đầu vào của hàm được truyền cho phương thức schedule trước khi hàm được thực thi trên một công nhân cụ thể.

Hiện tại, phương pháp schedule giả định các công nhân là tương đương và do đó giả định các tập dữ liệu trên các công nhân khác nhau là giống nhau ngoại trừ chúng có thể bị xáo trộn khác nhau nếu chúng chứa một thao tác dataset.shuffle . Do đó, chúng tôi cũng khuyên bạn nên lặp lại vô thời hạn các tập dữ liệu và lên lịch cho một số bước hữu hạn thay vì dựa vào OutOfRangeError từ một tập dữ liệu.

Một lưu ý quan trọng khác là bộ dữ liệu tf.data không hỗ trợ tuần tự hóa ngầm định và giải mã hóa trên các ranh giới nhiệm vụ. Vì vậy, điều quan trọng là phải tạo toàn bộ tập dữ liệu bên trong hàm được truyền vào create_per_worker_dataset .

Sharding biến

Sharding biến đề cập đến việc tách một biến thành nhiều biến nhỏ hơn. Chúng tôi gọi các biến nhỏ hơn này là shard s. Các phân đoạn biến đổi có thể hữu ích để phân phối tải mạng khi truy cập các phân đoạn này. Nó cũng hữu ích để phân phối tính toán và lưu trữ của một biến bình thường trên nhiều máy chủ tham số.

Để kích hoạt tính năng sharding biến, bạn có thể truyền vào variable_partitioner khi xây dựng đối tượng ParameterServerStrategy . variable_partitioner sẽ được gọi mỗi khi một biến được tạo và nó được mong đợi sẽ trả về số lượng phân đoạn dọc theo mỗi chiều của biến. Một số out-of-box variable_partitioner s được cung cấp như tf.distribute.experimental.partitioners.FixedShardsPartitioner .

Trong ví dụ trên, chúng tôi sử dụng FixedShardsPartitioner sẽ chia tất cả các biến thành hai phân đoạn và mỗi phân đoạn sẽ được gán cho các máy chủ tham số khác nhau:

assert len(emb_layer.weights) == 2
assert emb_layer.weights[0].shape == (5, 20)
assert emb_layer.weights[1].shape == (4, 20)
assert emb_layer.weights[0].device == "/job:ps/replica:0/task:0/device:CPU:0"
assert emb_layer.weights[1].device == "/job:ps/replica:0/task:1/device:CPU:0"

Khi một variable_partitioner được chuyển vào và nếu bạn tạo một biến trực tiếp trong strategy.scope() , nó sẽ trở thành một loại vùng chứa với thuộc tính variables cung cấp quyền truy cập vào danh sách các phân đoạn. Trong hầu hết các trường hợp, vùng chứa này sẽ được tự động chuyển đổi thành Tensor bằng cách nối tất cả các phân đoạn. Kết quả là, nó có thể được sử dụng như một biến bình thường. Mặt khác, một số phương thức TensorFlow như tf.nn.embedding_lookup cung cấp triển khai hiệu quả cho loại vùng chứa này và trong các phương pháp này sẽ tránh được việc ghép nối tự động.

Vui lòng xem chuỗi tài liệu API của ParameterServerStrategy để biết thêm chi tiết.

Đánh giá

Có nhiều cách để xác định và chạy một vòng lặp đánh giá trong đào tạo phân tán. Mỗi loại có ưu và nhược điểm riêng như được mô tả bên dưới. Phương pháp đánh giá nội tuyến được khuyến nghị nếu bạn không có sở thích.

Đánh giá nội tuyến

Trong phương pháp này, điều phối viên luân phiên giữa đào tạo và đánh giá và do đó chúng tôi gọi nó là đánh giá nội tuyến. Có một số lợi ích của việc đánh giá nội tuyến. Ví dụ, nó có thể hỗ trợ các mô hình đánh giá lớn và bộ dữ liệu đánh giá mà một tác vụ đơn lẻ không thể chứa. Ví dụ khác, kết quả đánh giá có thể được sử dụng để đưa ra quyết định đào tạo kỷ nguyên tiếp theo.

Có hai cách để thực hiện đánh giá nội tuyến:

  • Đánh giá trực tiếp - Đối với các mô hình nhỏ và tập dữ liệu đánh giá, điều phối viên có thể chạy đánh giá trực tiếp trên mô hình phân tán với tập dữ liệu đánh giá trên điều phối viên:
eval_dataset = tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).batch(8)

eval_accuracy = keras.metrics.Accuracy()
for batch_data, labels in eval_dataset:
  pred = model(batch_data, training=False)
  actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
  eval_accuracy.update_state(labels, actual_pred)

print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000

  • Đánh giá phân tán - Đối với các mô hình lớn hoặc bộ dữ liệu không thể chạy trực tiếp trên điều phối viên, nhiệm vụ điều phối viên có thể phân phối các nhiệm vụ đánh giá cho người lao động thông qua schedule / phương pháp join :
with strategy.scope():
  # Define the eval metric on parameter servers.
  eval_accuracy = keras.metrics.Accuracy()

@tf.function
def eval_step(iterator):
  def replica_fn(batch_data, labels):
    pred = model(batch_data, training=False)
    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    eval_accuracy.update_state(labels, actual_pred)
  batch_data, labels = next(iterator)
  strategy.run(replica_fn, args=(batch_data, labels))

def eval_dataset_fn():
  return tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).shuffle(16).repeat().batch(8)

per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn)
per_worker_eval_iterator = iter(per_worker_eval_dataset)

eval_steps_per_epoch = 2
for _ in range(eval_steps_per_epoch):
  coordinator.schedule(eval_step, args=(per_worker_eval_iterator,))
coordinator.join()
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000

Đánh giá bên hông xe

Một phương pháp khác được gọi là đánh giá bên xe, đó là tạo ra một nhiệm vụ người đánh giá chuyên dụng đọc nhiều lần các điểm kiểm tra và chạy đánh giá trên một điểm kiểm tra mới nhất. Nó cho phép chương trình đào tạo của bạn kết thúc sớm nếu bạn không cần thay đổi vòng lặp đào tạo của mình dựa trên kết quả đánh giá. Tuy nhiên, nó yêu cầu một nhiệm vụ người đánh giá bổ sung và kiểm tra định kỳ để kích hoạt đánh giá. Sau đây là một vòng đánh giá bên xe có thể có:

checkpoint_dir = ...
eval_model = ...
eval_data = ...
checkpoint = tf.train.Checkpoint(model=eval_model)

for latest_checkpoint in tf.train.checkpoints_iterator(
    checkpoint_dir):
  try:
    checkpoint.restore(latest_checkpoint).expect_partial()
  except (tf.errors.OpError,) as e:
    # checkpoint may be deleted by training when it is about to read it.
    continue

  # Optionally add callbacks to write summaries.
  eval_model.evaluate(eval_data)

  # Evaluation finishes when it has evaluated the last epoch.
  if latest_checkpoint.endswith('-{}'.format(train_epoches)):
    break

Các cụm trong thế giới thực

Trong môi trường sản xuất thực tế, bạn sẽ chạy tất cả các tác vụ trong các quy trình khác nhau trên các máy khác nhau. Cách đơn giản nhất để định cấu hình thông tin cụm trên mỗi tác vụ là đặt biến môi trường "TF_CONFIG" và sử dụng TFConfigClusterResolver để phân tích cú pháp "TF_CONFIG". Để có mô tả chung về các biến môi trường "TF_CONFIG", vui lòng xem hướng dẫn đào tạo phân tán .

Nếu bạn bắt đầu nhiệm vụ đào tạo của mình bằng Kubernetes hoặc các mẫu cấu hình khác, rất có thể các mẫu này đã đặt “TF_CONFIG” cho bạn.

Đặt biến môi trường “TF_CONFIG”

Giả sử bạn có 3 công nhân và 2 máy chủ tham số, “TF_CONFIG” của công nhân 1 có thể là:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"],
        "chief": ["host6:port"]
    },
   "task": {"type": "worker", "index": 1}
})

“TF_CONFIG” của người đánh giá có thể là:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "evaluator": ["host7:port"]
    },
   "task": {"type": "evaluator", "index": 0}
})

Phần "cụm" trong chuỗi "TF_CONFIG" ở trên cho người đánh giá là tùy chọn.

Nếu bạn sử dụng cùng một tệp nhị phân cho tất cả các tác vụ

Nếu bạn thích chạy tất cả các tác vụ này bằng cách sử dụng một tệp nhị phân duy nhất, bạn sẽ cần để chương trình của mình phân nhánh thành các vai trò khác nhau ngay từ đầu:

cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if cluster_resolver.task_type in ("worker", "ps"):
  # start a TensorFlow server and wait.
elif cluster_resolver.task_type == "evaluator":
  # run side-car evaluation
else:
  # run the coordinator.

Đoạn mã sau khởi động máy chủ TensorFlow và đợi:

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

cluster_resolver = tf.distribute.cluster_resolver.TF_ConfigClusterResolver()
server = tf.distribute.Server(
    cluster_resolver.cluster_spec(),
    job_name=cluster_resolver.task_type,
    task_index=cluster_resolver.task_id,
    protocol=cluster_resolver.rpc_layer or "grpc",
    start=True)
server.join()

Xử lý lỗi tác vụ

Công nhân thất bại

Như đã đề cập ở trên, ClusterCoordinator được tích hợp khả năng chịu lỗi đối với lỗi của nhân viên. Sau khi công nhân khôi phục, phần tương ứng của tập dữ liệu được tạo bởi create_per_worker_dataset vẫn còn trong phạm vi sẽ được tạo lại bằng cách gọi dataset_fn ban đầu của nó được chuyển tới create_per_worker_dataset .

Máy chủ tham số hoặc lỗi điều phối viên

Tuy nhiên, khi điều phối viên thấy lỗi máy chủ tham số, nó sẽ xuất hiện lỗi UnavailableError hoặc AbortedError ngay lập tức. Bạn có thể khởi động lại bộ điều phối trong trường hợp này. Bản thân điều phối viên cũng có thể trở nên không khả dụng. Do đó, để không làm mất nhiều tiến độ đào tạo, điều quan trọng là phải kiểm tra các biến mô hình theo định kỳ và tải các biến mô hình từ một điểm kiểm tra, nếu có, trước khi bắt đầu đào tạo. Tiến trình đào tạo có thể được suy ra gần đúng từ optimizer.iterations nếu trình tối ưu hóa được kiểm tra.

checkpoint_manager = tf.train.CheckpointManager(
    tf.train.Checkpoint(model=model, optimizer=optimizer),
    checkpoint_dir,
    max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
  checkpoint = checkpoint_manager.checkpoint
  checkpoint.restore(
      checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()

global_steps = int(optimizer.iterations.numpy())
starting_epoch = global_steps // steps_per_epoch

for _ in range(starting_epoch, num_epoches):
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  coordinator.join()
  checkpoint_manager.save()

Tìm nạp RemoteValue

Việc tìm nạp một RemoteValue được đảm bảo sẽ thành công nếu một hàm được thực thi thành công. Điều này là do hiện tại giá trị trả về được sao chép ngay lập tức tới bộ điều phối sau khi một hàm được thực thi. Nếu có bất kỳ lỗi worker nào trong quá trình sao chép, chức năng sẽ được thử lại trên một worker hiện có khác. Do đó, nếu bạn muốn tối ưu hóa hiệu suất, bạn có thể lên lịch cho các hàm mà không có giá trị trả về.

Báo cáo lỗi

Khi điều phối viên gặp lỗi như UnavailableError từ các máy chủ tham số hoặc các lỗi ứng dụng khác như Lỗi không hợp InvalidArgument từ tf.debugging.check_numerics , điều phối viên sẽ hủy tất cả các chức năng đang chờ xử lý và xếp hàng đợi trước khi phát sinh lỗi. Lấy tương ứng của họ RemoteValue s sẽ nâng cao một CancelledError .

Sau khi một lỗi được nêu ra, điều phối viên sẽ không đưa ra cùng một lỗi hoặc bất kỳ lỗi nào từ các chức năng bị hủy.

Cải thiện hiệu suất

Có một số lý do có thể xảy ra nếu bạn gặp vấn đề về hiệu suất khi đào tạo với ParameterServerStrategyClusterResolver .

Một lý do phổ biến là các máy chủ tham số có tải không cân bằng và một số máy chủ tham số được tải nặng đã đạt đến dung lượng. Cũng có thể có nhiều nguyên nhân gốc rễ. Một số phương pháp đơn giản để giảm thiểu vấn đề này là

  1. Shard biến mô hình lớn của bạn thông qua việc chỉ định một variable_partitioner khi xây dựng một ParameterServerStrategy .
  2. tránh tạo biến điểm phát sóng được yêu cầu bởi tất cả các máy chủ tham số trong một bước nếu có thể. Ví dụ: sử dụng tốc độ học không đổi hoặc lớp con tf.keras.optimizers.schedules.LearningRateSchedule trong trình tối ưu hóa vì hành vi mặc định là tốc độ học tập sẽ trở thành một biến được đặt trên một máy chủ tham số cụ thể và được yêu cầu bởi tất cả các máy chủ tham số khác trong mỗi bước .
  3. xáo trộn các từ vựng lớn của bạn trước khi chuyển chúng đến các lớp tiền xử lý của Keras.

Một lý do có thể khác cho các vấn đề về hiệu suất là điều phối viên. Việc triển khai schedule / join đầu tiên của chúng tôi dựa trên Python và do đó có thể có chi phí phân luồng. Ngoài ra, độ trễ giữa người điều phối và người lao động có thể lớn. Nếu đúng như vậy, bạn có thể đóng gói nhiều bước vào một tf.function duy nhất:

steps_per_invocation = 10
@tf.function
def step_fn(iterator):
  for _ in range(steps_per_invocation):
    features, labels = next(iterator)
    def replica_fn(features, labels):
      ...

    strategy.run(replica_fn, args=(features, labels))

Chúng tôi sẽ tiếp tục tối ưu hóa trình điều phối và hy vọng hầu hết người dùng không phải đóng gói các bước theo cách thủ công trong tương lai.

Ngoài ra, một mẹo nhỏ để cải thiện hiệu suất là lên lịch cho các hàm không có giá trị trả về như đã giải thích trong phần xử lý lỗi tác vụ ở trên.

Hạn chế đã biết

Hầu hết các hạn chế đã biết được đề cập trong các phần trên. Đây là một bản tóm tắt:

  • os.environment["grpc_fail_fast"]="use_caller" là cần thiết cho mọi tác vụ, kể cả điều phối viên, để khả năng chịu lỗi hoạt động bình thường.
  • Công nhân GPU không được hỗ trợ.
  • Đào tạo máy chủ tham số đồng bộ không được hỗ trợ.
  • ParameterServerStrategy không hoạt động với các API compilefit Keras.
  • ClusterCoordinator.schedule không hỗ trợ đảm bảo lượt truy cập cho tập dữ liệu.
  • Khi ClusterCoordinator.create_per_worker_dataset được sử dụng, toàn bộ tập dữ liệu phải được tạo bên trong hàm được chuyển cho nó.
  • Thông thường cần phải đóng gói nhiều bước vào một chức năng duy nhất để đạt được hiệu suất tối ưu.
  • Nó không được hỗ trợ để tải một save_model qua tf.saved_model.load có chứa các biến phân đoạn. Lưu ý rằng việc tải một save_model như vậy bằng cách sử dụng TensorFlow Serving sẽ hoạt động.
  • Nó không được hỗ trợ để tải một điểm kiểm tra liên kết các biến vị trí trình tối ưu hóa phân đoạn thành một số phân đoạn khác nhau.
  • Nó không được hỗ trợ để khôi phục từ lỗi máy chủ tham số mà không khởi động lại tác vụ điều phối viên.