Đào tạo máy chủ tham số với ParameterServerStrategy

Xem trên TensorFlow.org Chạy trong Google Colab Xem nguồn trên GitHub Tải xuống sổ ghi chép

Tổng quat

Đào tạo máy chủ tham số là một phương pháp song song dữ liệu phổ biến để mở rộng quy mô đào tạo mô hình trên nhiều máy.

Một cụm đào tạo máy chủ tham số bao gồm công nhânmáy chủ tham số . Các biến được tạo trên máy chủ tham số và chúng được công nhân đọc và cập nhật trong mỗi bước. Theo mặc định, công nhân đọc và cập nhật các biến này một cách độc lập mà không đồng bộ hóa với nhau. Đây là lý do tại sao đôi khi huấn luyện kiểu máy chủ tham số được gọi là huấn luyện không đồng bộ .

Trong TensorFlow 2, đào tạo máy chủ tham số được cung cấp bởi lớp tf.distribute.experimental.ParameterServerStrategy , phân phối các bước đào tạo cho một cụm có quy mô lên đến hàng nghìn công nhân (đi kèm với máy chủ tham số).

Các phương pháp đào tạo được hỗ trợ

Có hai phương pháp đào tạo được hỗ trợ chính:

Một cụm với các công việc và nhiệm vụ

Bất kể API được lựa chọn là gì ( Model.fit hoặc vòng đào tạo tùy chỉnh), đào tạo phân tán trong TensorFlow 2 bao gồm: một 'cluster' với một số 'jobs' và mỗi công việc có thể có một hoặc nhiều 'tasks' .

Khi sử dụng đào tạo máy chủ tham số, bạn nên có:

  • Một công việc điều phối viên (có chief tên công việc)
  • Nhiều công việc công nhân (công việc tên worker ); và
  • Nhiều công việc máy chủ tham số (tên công việc ps )

Trong khi điều phối viên tạo tài nguyên, gửi các nhiệm vụ đào tạo, ghi các điểm kiểm tra và xử lý các lỗi tác vụ, các workermáy chủ tham số chạy tf.distribute.Server lắng nghe các yêu cầu từ điều phối viên.

Đào tạo máy chủ tham số với API Model.fit

Đào tạo máy chủ tham số với API Model.fit yêu cầu điều phối viên sử dụng đối tượng tf.distribute.experimental.ParameterServerStrategytf.keras.utils.experimental.DatasetCreator làm đầu vào. Tương tự như việc sử dụng Model.fit không có chiến lược hoặc với các chiến lược khác, quy trình làm việc bao gồm việc tạo và biên dịch mô hình, chuẩn bị các lệnh gọi lại, sau đó là lệnh gọi Model.fit .

Đào tạo máy chủ tham số với vòng lặp đào tạo tùy chỉnh

Với các vòng lặp huấn luyện tùy chỉnh, lớp tf.distribute.experimental.coordinator.ClusterCoordinator là thành phần chính được sử dụng cho điều phối viên.

API quan trọng nhất được cung cấp bởi đối tượng ClusterCoordinatorschedule :

  • API schedule hàng một chức năng tf. và trả về một tf.function tương tự trong tương RemoteValue ngay lập tức.
  • Các chức năng được xếp hàng đợi sẽ được gửi đến nhân viên từ xa trong các luồng nền và các RemoteValue của chúng sẽ được điền không đồng bộ.
  • schedule không yêu cầu chỉ định công nhân, nên chức tf.function được chuyển vào có thể được thực thi trên bất kỳ công nhân nào có sẵn.
  • Nếu worker mà nó được thực thi không khả dụng trước khi hoàn thành, thì hàm sẽ được thử lại trên một worker có sẵn khác.
  • Vì thực tế này và thực tế là việc thực thi hàm không phải là nguyên tử, một hàm có thể được thực thi nhiều hơn một lần.

Ngoài việc điều phối các chức năng từ xa, ClusterCoordinator cũng giúp tạo các tập dữ liệu về tất cả các nhân viên và xây dựng lại các tập dữ liệu này khi một nhân viên phục hồi sau lỗi.

Hướng dẫn thiết lập

Hướng dẫn sẽ phân nhánh thành Model.fit và các đường dẫn vòng lặp đào tạo tùy chỉnh, và bạn có thể chọn một trong những đường dẫn phù hợp với nhu cầu của mình. Các phần không phải là "Đào tạo với X" có thể áp dụng cho cả hai cách.

pip install portpicker

Thiết lập cụm

Như đã đề cập ở trên, một cụm đào tạo máy chủ tham số yêu cầu một tác vụ điều phối viên chạy chương trình đào tạo của bạn, một hoặc một số công nhân và tác vụ máy chủ tham số chạy máy chủ TensorFlow— tf.distribute.Server —và có thể là một tác vụ đánh giá bổ sung chạy đánh giá bên xe (xem phần đánh giá xe bên dưới). Các yêu cầu để thiết lập chúng là:

  • Tác vụ điều phối viên cần biết địa chỉ và cổng của tất cả các máy chủ TensorFlow khác ngoại trừ trình đánh giá.
  • Các worker và các máy chủ tham số cần biết họ cần lắng nghe cổng nào. Để đơn giản, bạn thường có thể chuyển thông tin cụm đầy đủ khi tạo máy chủ TensorFlow cho các tác vụ này.
  • Người đánh giá nhiệm vụ không cần phải biết thiết lập của cụm đào tạo. Nếu có, nó không nên cố gắng kết nối với cụm đào tạo.
  • Công nhân và máy chủ tham số phải có các loại tác vụ tương ứng là "worker""ps" . Điều phối viên nên sử dụng "chief" làm loại nhiệm vụ vì các lý do kế thừa.

Trong hướng dẫn này, bạn sẽ tạo một cụm trong quá trình để có thể chạy toàn bộ quá trình đào tạo máy chủ tham số trong Colab. Bạn sẽ học cách thiết lập các cụm thực trong phần sau.

Cụm trong quá trình

Bạn sẽ bắt đầu bằng cách tạo trước một số máy chủ TensorFlow và kết nối với chúng sau. Lưu ý rằng điều này chỉ nhằm mục đích trình diễn của hướng dẫn này và trong quá trình đào tạo thực tế, các máy chủ sẽ được khởi động trên các máy "worker""ps" .

def create_in_process_cluster(num_workers, num_ps):
  """Creates and starts local servers and returns the cluster_resolver."""
  worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
  ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]

  cluster_dict = {}
  cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
  if num_ps > 0:
    cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]

  cluster_spec = tf.train.ClusterSpec(cluster_dict)

  # Workers need some inter_ops threads to work properly.
  worker_config = tf.compat.v1.ConfigProto()
  if multiprocessing.cpu_count() < num_workers + 1:
    worker_config.inter_op_parallelism_threads = num_workers + 1

  for i in range(num_workers):
    tf.distribute.Server(
        cluster_spec,
        job_name="worker",
        task_index=i,
        config=worker_config,
        protocol="grpc")

  for i in range(num_ps):
    tf.distribute.Server(
        cluster_spec,
        job_name="ps",
        task_index=i,
        protocol="grpc")

  cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
      cluster_spec, rpc_layer="grpc")
  return cluster_resolver

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)

Thiết lập cụm trong quá trình thường được sử dụng trong kiểm thử đơn vị, chẳng hạn như ở đây .

Một tùy chọn khác để kiểm tra cục bộ là khởi chạy các quy trình trên máy cục bộ — hãy xem đào tạo Đa nhân viên với Keras để biết ví dụ về cách tiếp cận này.

Khởi tạo một ParameterServerStrategy

Trước khi bạn đi sâu vào mã đào tạo, hãy khởi tạo một đối tượng ParameterServerStrategy . Lưu ý rằng điều này là cần thiết bất kể bạn đang tiếp tục với Model.fit hay một vòng đào tạo tùy chỉnh. Đối variable_partitioner sẽ được giải thích trong phần Variable sharding .

variable_partitioner = (
    tf.distribute.experimental.partitioners.MinSizePartitioner(
        min_shard_bytes=(256 << 10),
        max_shards=NUM_PS))

strategy = tf.distribute.experimental.ParameterServerStrategy(
    cluster_resolver,
    variable_partitioner=variable_partitioner)
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'ps': ['localhost:19069', 'localhost:19342'], 'worker': ['localhost:18007', 'localhost:20252', 'localhost:23613']})
INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'ps': ['localhost:19069', 'localhost:19342'], 'worker': ['localhost:18007', 'localhost:20252', 'localhost:23613']})
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:chief/replica:0/task:0/device:GPU:0'], variable_device = '/job:chief/replica:0/task:0/device:GPU:0'
INFO:tensorflow:Number of GPUs on workers: 1

Để sử dụng GPU cho đào tạo, hãy phân bổ GPU hiển thị cho từng nhân viên. ParameterServerStrategy sẽ sử dụng tất cả các GPU có sẵn trên mỗi nhân viên, với hạn chế là tất cả nhân viên phải có cùng số lượng GPU khả dụng.

Sharding biến

Sharding biến đề cập đến việc tách một biến thành nhiều biến nhỏ hơn, được gọi là phân đoạn . Các phân đoạn biến đổi có thể hữu ích để phân phối tải mạng khi truy cập các phân đoạn này. Nó cũng hữu ích để phân phối tính toán và lưu trữ của một biến bình thường trên nhiều máy chủ tham số.

Để kích hoạt tính năng sharding biến, bạn có thể chuyển vào variable_partitioner khi xây dựng đối tượng ParameterServerStrategy . variable_partitioner vùng sẽ được gọi mỗi khi một biến được tạo và nó được mong đợi sẽ trả về số phân đoạn dọc theo mỗi chiều của biến. Một số variable_partitioner có sẵn được cung cấp chẳng hạn như tf.distribute.experimental.partitioners.MinSizePartitioner . Bạn nên sử dụng các trình phân vùng dựa trên kích thước như tf.distribute.experimental.partitioners.MinSizePartitioner để tránh phân vùng các biến nhỏ, điều này có thể có tác động tiêu cực đến tốc độ đào tạo mô hình.

Khi một variable_partitioner được chuyển vào và nếu bạn tạo một biến trực tiếp trong strategy.scope() , nó sẽ trở thành một loại vùng chứa với thuộc tính variables cung cấp quyền truy cập vào danh sách các phân đoạn. Trong hầu hết các trường hợp, vùng chứa này sẽ được tự động chuyển đổi thành Tensor bằng cách nối tất cả các phân đoạn. Do đó, nó có thể được sử dụng như một biến bình thường. Mặt khác, một số phương thức TensorFlow như tf.nn.embedding_lookup cung cấp triển khai hiệu quả cho loại vùng chứa này và trong các phương pháp này sẽ tránh được việc ghép nối tự động.

Vui lòng xem tài liệu API của tf.distribute.experimental.ParameterServerStrategy để biết thêm chi tiết.

Đào tạo với Model.fit

Keras cung cấp một API đào tạo dễ sử dụng thông qua Model.fit để xử lý vòng lặp đào tạo ẩn, với tính linh hoạt của train_step và callback, cung cấp các chức năng như lưu điểm kiểm tra hoặc lưu tóm tắt cho TensorBoard. Với Model.fit , cùng một mã đào tạo có thể được sử dụng cho các chiến lược khác với một hoán đổi đơn giản của đối tượng chiến lược.

Dữ liệu đầu vào

Model.fit có đào tạo máy chủ tham số yêu cầu dữ liệu đầu vào được cung cấp trong một tệp có thể gọi nhận một đối số duy nhất của kiểu tf.distribute.InputContext và trả về một tf.data.Dataset . Sau đó, tạo một đối tượng tf.keras.utils.experimental.DatasetCreatorcallable như vậy và một đối tượng tf.distribute.InputOptions tùy chọn thông qua đối số input_options .

Lưu ý rằng bạn nên xáo trộn và lặp lại dữ liệu với đào tạo máy chủ tham số và chỉ định steps_per_epoch trong lệnh gọi fit để thư viện biết ranh giới kỷ nguyên.

Vui lòng xem hướng dẫn nhập phân tán để biết thêm thông tin về đối số InputContext .

def dataset_fn(input_context):
  global_batch_size = 64
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)

  x = tf.random.uniform((10, 10))
  y = tf.random.uniform((10,))

  dataset = tf.data.Dataset.from_tensor_slices((x, y)).shuffle(10).repeat()
  dataset = dataset.shard(
      input_context.num_input_pipelines,
      input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2)

  return dataset

dc = tf.keras.utils.experimental.DatasetCreator(dataset_fn)

Mã trong dataset_fn sẽ được gọi trên thiết bị đầu vào, thường là CPU, trên mỗi máy công nhân.

Xây dựng và biên dịch mô hình

Bây giờ, bạn sẽ tạo một tf.keras.Model —một mô hình nhỏ tf.keras.models.Sequential cho mục đích trình diễn — tiếp theo là một lệnh gọi Model.compile để kết hợp các thành phần, chẳng hạn như trình tối ưu hóa, số liệu hoặc các tham số như steps_per_execution :

with strategy.scope():
  model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)])

  model.compile(tf.keras.optimizers.SGD(), loss='mse', steps_per_execution=10)

Gọi lại và đào tạo

Trước khi bạn gọi model.fit cho khóa đào tạo thực tế, hãy chuẩn bị các lệnh gọi lại cần thiết cho các tác vụ phổ biến, chẳng hạn như:

  • ModelCheckpoint : để lưu trọng số của mô hình.
  • BackupAndRestore : để đảm bảo tiến trình đào tạo được tự động sao lưu và phục hồi nếu cụm gặp sự cố không khả dụng (chẳng hạn như hủy bỏ hoặc ưu tiên); hoặc là
  • TensorBoard : để lưu các báo cáo tiến độ thành các tệp tóm tắt, được hiển thị trực quan trong công cụ TensorBoard.
working_dir = '/tmp/my_working_dir'
log_dir = os.path.join(working_dir, 'log')
ckpt_filepath = os.path.join(working_dir, 'ckpt')
backup_dir = os.path.join(working_dir, 'backup')

callbacks = [
    tf.keras.callbacks.TensorBoard(log_dir=log_dir),
    tf.keras.callbacks.ModelCheckpoint(filepath=ckpt_filepath),
    tf.keras.callbacks.BackupAndRestore(backup_dir=backup_dir),
]

model.fit(dc, epochs=5, steps_per_epoch=20, callbacks=callbacks)
Epoch 1/5
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
2022-01-26 05:32:01.399347: W tensorflow/python/util/util.cc:368] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 4s - loss: 0.5761 - 4s/epoch - 185ms/step
Epoch 2/5
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 1s - loss: 0.4423 - 561ms/epoch - 28ms/step
Epoch 3/5
WARNING:tensorflow:5 out of the last 5 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f89783e7560> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
WARNING:tensorflow:6 out of the last 6 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f897851f050> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
20/20 - 0s - loss: 0.3983 - 392ms/epoch - 20ms/step
Epoch 4/5
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 1s - loss: 0.3592 - 507ms/epoch - 25ms/step
Epoch 5/5
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 0s - loss: 0.2705 - 384ms/epoch - 19ms/step
<keras.callbacks.History at 0x7f89984ca890>

Sử dụng trực tiếp với ClusterCoordinator (tùy chọn)

Ngay cả khi bạn chọn đường dẫn đào tạo Model.fit , bạn có thể tùy chọn khởi tạo đối tượng tf.distribute.experimental.coordinator.ClusterCoordinator để lên lịch cho các chức năng khác mà bạn muốn thực thi trên worker. Xem phần Đào tạo với vòng lặp đào tạo tùy chỉnh để biết thêm chi tiết và ví dụ.

Đào tạo với một vòng đào tạo tùy chỉnh

Sử dụng các vòng huấn luyện tùy chỉnh với tf.distribute.Strategy cung cấp sự linh hoạt tuyệt vời để xác định các vòng huấn luyện. Với ParameterServerStrategy được định nghĩa ở trên (dưới dạng strategy ), bạn sẽ sử dụng tf.distribute.experimental.coordinator.ClusterCoordinator để điều phối việc thực hiện các bước đào tạo cho nhân viên từ xa.

Sau đó, bạn sẽ tạo một mô hình, xác định một tập dữ liệu và một hàm bước, như bạn đã thực hiện trong vòng lặp đào tạo với các tf.distribute.Strategy khác. Bạn có thể tìm thêm chi tiết trong hướng dẫn Đào tạo tùy chỉnh với tf.distribute.Strategy .

Để đảm bảo tìm nạp trước tập dữ liệu hiệu quả, hãy sử dụng các API tạo tập dữ liệu phân tán được đề xuất được đề cập trong phần đào tạo Công văn cho nhân viên từ xa bên dưới. Ngoài ra, hãy nhớ gọi Strategy.run bên trong worker_fn để tận dụng tối đa GPU được phân bổ cho worker. Các bước còn lại đều giống nhau đối với đào tạo có hoặc không có GPU.

Hãy tạo các thành phần này theo các bước sau:

Thiết lập dữ liệu

Đầu tiên, hãy viết một hàm tạo tập dữ liệu bao gồm logic tiền xử lý được thực hiện bởi các lớp tiền xử lý của Keras .

Bạn sẽ tạo các lớp này bên ngoài dataset_fn nhưng áp dụng phép biến đổi bên trong dataset_fn , vì bạn sẽ bọc dataset_fn thành một hàm tf. tf.function năng này không cho phép tạo các biến bên trong nó.

feature_vocab = [
    "avenger", "ironman", "batman", "hulk", "spiderman", "kingkong", "wonder_woman"
]
label_vocab = ["yes", "no"]

with strategy.scope():
  feature_lookup_layer = tf.keras.layers.StringLookup(
      vocabulary=feature_vocab,
      mask_token=None)
  label_lookup_layer = tf.keras.layers.StringLookup(
      vocabulary=label_vocab,
      num_oov_indices=0,
      mask_token=None)

  raw_feature_input = tf.keras.layers.Input(
      shape=(3,),
      dtype=tf.string,
      name="feature")
  feature_id_input = feature_lookup_layer(raw_feature_input)
  feature_preprocess_stage = tf.keras.Model(
      {"features": raw_feature_input},
      feature_id_input)

  raw_label_input = tf.keras.layers.Input(
      shape=(1,),
      dtype=tf.string,
      name="label")
  label_id_input = label_lookup_layer(raw_label_input)

  label_preprocess_stage = tf.keras.Model(
      {"label": raw_label_input},
      label_id_input)
/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/numpy/core/numeric.py:2446: FutureWarning: elementwise comparison failed; returning scalar instead, but in the future will perform elementwise comparison
  return bool(asarray(a1 == a2).all())

Tạo các ví dụ đồ chơi trong tập dữ liệu:

def feature_and_label_gen(num_examples=200):
  examples = {"features": [], "label": []}
  for _ in range(num_examples):
    features = random.sample(feature_vocab, 3)
    label = ["yes"] if "avenger" in features else ["no"]
    examples["features"].append(features)
    examples["label"].append(label)
  return examples

examples = feature_and_label_gen()

Sau đó, tạo tập dữ liệu đào tạo được bao bọc trong dataset_fn :

def dataset_fn(_):
  raw_dataset = tf.data.Dataset.from_tensor_slices(examples)

  train_dataset = raw_dataset.map(
      lambda x: (
          {"features": feature_preprocess_stage(x["features"])},
          label_preprocess_stage(x["label"])
      )).shuffle(200).batch(32).repeat()
  return train_dataset

Xây dựng mô hình

Tiếp theo, tạo mô hình và các đối tượng khác. Đảm bảo tạo tất cả các biến trong strategy.scope .

# These variables created under the `strategy.scope` will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
  # Create the model. The input needs to be compatible with Keras processing layers.
  model_input = tf.keras.layers.Input(
      shape=(3,), dtype=tf.int64, name="model_input")

  emb_layer = tf.keras.layers.Embedding(
      input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=16384)
  emb_output = tf.reduce_mean(emb_layer(model_input), axis=1)
  dense_output = tf.keras.layers.Dense(units=1, activation="sigmoid")(emb_output)
  model = tf.keras.Model({"features": model_input}, dense_output)

  optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.1)
  accuracy = tf.keras.metrics.Accuracy()

Hãy xác nhận rằng việc sử dụng FixedShardsPartitioner chia tất cả các biến thành hai phân đoạn và mỗi phân đoạn được gán cho các máy chủ tham số khác nhau:

assert len(emb_layer.weights) == 2
assert emb_layer.weights[0].shape == (4, 16384)
assert emb_layer.weights[1].shape == (4, 16384)
assert emb_layer.weights[0].device == "/job:ps/replica:0/task:0/device:CPU:0"
assert emb_layer.weights[1].device == "/job:ps/replica:0/task:1/device:CPU:0"

Xác định bước đào tạo

Thứ ba, tạo bước đào tạo được bao bọc thành một tf.function .:

@tf.function
def step_fn(iterator):

  def replica_fn(batch_data, labels):
    with tf.GradientTape() as tape:
      pred = model(batch_data, training=True)
      per_example_loss = tf.keras.losses.BinaryCrossentropy(
              reduction=tf.keras.losses.Reduction.NONE)(labels, pred)
      loss = tf.nn.compute_average_loss(per_example_loss)
      gradients = tape.gradient(loss, model.trainable_variables)

    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    accuracy.update_state(labels, actual_pred)
    return loss

  batch_data, labels = next(iterator)
  losses = strategy.run(replica_fn, args=(batch_data, labels))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)

Trong hàm bước đào tạo ở trên, việc gọi Strategy.runStrategy.reduce trong step_fn có thể hỗ trợ nhiều GPU cho mỗi nhân viên. Nếu nhân viên có GPU được phân bổ, Strategy.run sẽ phân phối bộ dữ liệu trên nhiều bản sao.

Cử các bước đào tạo cho nhân viên từ xa

Sau khi tất cả các phép tính được xác định bởi ParameterServerStrategy , bạn sẽ sử dụng lớp tf.distribute.experimental.coordinator.ClusterCoordinator để tạo tài nguyên và phân phối các bước đào tạo cho nhân viên từ xa.

Đầu tiên, hãy tạo một đối tượng ClusterCoordinator và chuyển đối tượng chiến lược vào:

coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)

Sau đó, tạo một tập dữ liệu cho mỗi công nhân và một trình vòng lặp. Trong per_worker_dataset_fn dưới đây, nên gói dataset_fn vào strategy.distribute_datasets_from_function Chức năng để cho phép tìm nạp trước hiệu quả vào GPU một cách liền mạch.

@tf.function
def per_worker_dataset_fn():
  return strategy.distribute_datasets_from_function(dataset_fn)

per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).

Bước cuối cùng là phân phối tính toán cho nhân viên từ xa bằng cách sử dụng ClusterCoordinator.schedule :

  • Phương thức schedule hàng một hàm tf. và trả về một tf.function RemoteValue trong tương lai ngay lập tức. Các chức năng được xếp hàng đợi sẽ được gửi đến nhân viên từ xa trong các luồng nền và RemoteValue sẽ được điền không đồng bộ.
  • Phương thức join ( ClusterCoordinator.join ) có thể được sử dụng để đợi cho đến khi tất cả các hàm đã lên lịch được thực thi.
num_epoches = 4
steps_per_epoch = 5
for i in range(num_epoches):
  accuracy.reset_states()
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  # Wait at epoch boundaries.
  coordinator.join()
  print ("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
Finished epoch 0, accuracy is 0.543750.
Finished epoch 1, accuracy is 0.543750.
Finished epoch 2, accuracy is 0.950000.
Finished epoch 3, accuracy is 1.000000.

Đây là cách bạn có thể tìm nạp kết quả của một RemoteValue :

loss = coordinator.schedule(step_fn, args=(per_worker_iterator,))
print ("Final loss is %f" % loss.fetch())
Final loss is 0.000000

Ngoài ra, bạn có thể khởi chạy tất cả các bước và làm điều gì đó trong khi chờ hoàn thành:

for _ in range(total_steps):
  coordinator.schedule(step_fn, args=(per_worker_iterator,))
while not coordinator.done():
  time.sleep(10)
  # Do something like logging metrics or writing checkpoints.

Để có quy trình đào tạo và phục vụ hoàn chỉnh cho ví dụ cụ thể này, vui lòng xem bài kiểm tra này.

Thông tin thêm về tạo tập dữ liệu

Tập dữ liệu trong đoạn mã trên được tạo bằng API ClusterCoordinator.create_per_worker_dataset ). Nó tạo một tập dữ liệu cho mỗi công nhân và trả về một đối tượng vùng chứa. Bạn có thể gọi phương thức iter trên đó để tạo một trình vòng lặp cho mỗi công nhân. Trình vòng lặp cho mỗi công nhân chứa một trình lặp cho mỗi công nhân và lát tương ứng của một công nhân sẽ được thay thế trong đối số đầu vào của hàm được truyền tới phương thức ClusterCoordinator.schedule trước khi hàm được thực thi trên một công nhân cụ thể.

Hiện tại, phương thức ClusterCoordinator.schedule giả định các worker là tương đương và do đó giả định các tập dữ liệu trên các worker khác nhau là giống nhau ngoại trừ chúng có thể bị xáo trộn khác nhau nếu chúng chứa một thao tác Dataset.shuffle . Do đó, chúng tôi cũng khuyến nghị rằng các tập dữ liệu được lặp lại vô thời hạn và bạn lập lịch cho một số bước hữu hạn thay vì dựa vào OutOfRangeError từ một tập dữ liệu.

Một lưu ý quan trọng khác là tập dữ liệu tf.data không hỗ trợ tuần tự hóa ngầm định và giải mã hóa trên các ranh giới nhiệm vụ. Vì vậy, điều quan trọng là phải tạo toàn bộ tập dữ liệu bên trong hàm được chuyển đến ClusterCoordinator.create_per_worker_dataset .

Đánh giá

Có nhiều cách để xác định và chạy một vòng lặp đánh giá trong đào tạo phân tán. Mỗi loại đều có ưu và nhược điểm riêng như được mô tả bên dưới. Phương pháp đánh giá nội tuyến được khuyến nghị nếu bạn không có sở thích.

Đánh giá nội tuyến

Trong phương pháp này, người điều phối luân phiên giữa đào tạo và đánh giá và do đó nó được gọi là đánh giá nội tuyến .

Có một số lợi ích của việc đánh giá nội tuyến. Ví dụ:

  • Nó có thể hỗ trợ các mô hình đánh giá lớn và bộ dữ liệu đánh giá mà một tác vụ đơn lẻ không thể chứa được.
  • Kết quả đánh giá có thể được sử dụng để đưa ra quyết định đào tạo cho kỷ nguyên tiếp theo.

Có hai cách để thực hiện đánh giá nội tuyến: đánh giá trực tiếp và đánh giá phân tán.

  • Đánh giá trực tiếp : Đối với các mô hình nhỏ và tập dữ liệu đánh giá, điều phối viên có thể chạy đánh giá trực tiếp trên mô hình phân tán với tập dữ liệu đánh giá trên điều phối viên:
eval_dataset = tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).batch(8)

eval_accuracy = tf.keras.metrics.Accuracy()

for batch_data, labels in eval_dataset:
  pred = model(batch_data, training=False)
  actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
  eval_accuracy.update_state(labels, actual_pred)

print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000
  • Đánh giá phân tán : Đối với các mô hình lớn hoặc bộ dữ liệu không thể chạy trực tiếp trên điều phối viên, nhiệm vụ điều phối viên có thể phân phối các nhiệm vụ đánh giá cho nhân viên thông qua các phương thức ClusterCoordinator.schedule / ClusterCoordinator.join :
with strategy.scope():
  # Define the eval metric on parameter servers.
  eval_accuracy = tf.keras.metrics.Accuracy()

@tf.function
def eval_step(iterator):
  def replica_fn(batch_data, labels):
    pred = model(batch_data, training=False)
    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    eval_accuracy.update_state(labels, actual_pred)
  batch_data, labels = next(iterator)
  strategy.run(replica_fn, args=(batch_data, labels))

def eval_dataset_fn():
  return tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).shuffle(16).repeat().batch(8)

per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn)
per_worker_eval_iterator = iter(per_worker_eval_dataset)

eval_steps_per_epoch = 2
for _ in range(eval_steps_per_epoch):
  coordinator.schedule(eval_step, args=(per_worker_eval_iterator,))
coordinator.join()
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources
WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources
WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources
Evaluation accuracy: 1.000000

Đánh giá bên hông xe

Một phương pháp khác được gọi là đánh giá bên xe , trong đó bạn tạo một nhiệm vụ chuyên gia đánh giá liên tục đọc các điểm kiểm tra và chạy đánh giá trên một điểm kiểm tra mới nhất. Nó cho phép chương trình đào tạo của bạn kết thúc sớm nếu bạn không cần thay đổi vòng lặp đào tạo của mình dựa trên kết quả đánh giá. Tuy nhiên, nó yêu cầu một nhiệm vụ người đánh giá bổ sung và điểm kiểm tra định kỳ để kích hoạt đánh giá. Sau đây là một vòng đánh giá bên xe có thể có:

checkpoint_dir = ...
eval_model = ...
eval_data = ...
checkpoint = tf.train.Checkpoint(model=eval_model)

for latest_checkpoint in tf.train.checkpoints_iterator(
    checkpoint_dir):
  try:
    checkpoint.restore(latest_checkpoint).expect_partial()
  except (tf.errors.OpError,) as e:
    # checkpoint may be deleted by training when it is about to read it.
    continue

  # Optionally add callbacks to write summaries.
  eval_model.evaluate(eval_data)

  # Evaluation finishes when it has evaluated the last epoch.
  if latest_checkpoint.endswith('-{}'.format(train_epoches)):
    break

Các cụm trong thế giới thực

Trong môi trường sản xuất thực tế, bạn sẽ chạy tất cả các tác vụ trong các quy trình khác nhau trên các máy khác nhau. Cách đơn giản nhất để định cấu hình thông tin cụm trên mỗi tác vụ là đặt các biến môi trường "TF_CONFIG" và sử dụng tf.distribute.cluster_resolver.TFConfigClusterResolver để phân tích cú pháp "TF_CONFIG" .

Để biết mô tả chung về các biến môi trường "TF_CONFIG" , hãy tham khảo Hướng dẫn đào tạo phân tán .

Nếu bạn bắt đầu nhiệm vụ đào tạo của mình bằng Kubernetes hoặc các mẫu cấu hình khác, rất có thể các mẫu này đã đặt “TF_CONFIG" cho bạn.

Đặt biến môi trường "TF_CONFIG"

Giả sử bạn có 3 công nhân và 2 máy chủ tham số, "TF_CONFIG" của công nhân 1 có thể là:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"],
        "chief": ["host6:port"]
    },
    "task": {"type": "worker", "index": 1}
})

"TF_CONFIG" của người đánh giá có thể là:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "evaluator": ["host7:port"]
    },
    "task": {"type": "evaluator", "index": 0}
})

Phần "cluster" trong chuỗi "TF_CONFIG" ở trên cho người đánh giá là tùy chọn.

Nếu bạn sử dụng cùng một tệp nhị phân cho tất cả các tác vụ

Nếu bạn muốn chạy tất cả các tác vụ này bằng cách sử dụng một tệp nhị phân duy nhất, bạn sẽ cần để chương trình của mình phân nhánh thành các vai trò khác nhau ngay từ đầu:

cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if cluster_resolver.task_type in ("worker", "ps"):
  # Start a TensorFlow server and wait.
elif cluster_resolver.task_type == "evaluator":
  # Run side-car evaluation
else:
  # Run the coordinator.

Đoạn mã sau khởi động máy chủ TensorFlow và đợi:

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

server = tf.distribute.Server(
    cluster_resolver.cluster_spec(),
    job_name=cluster_resolver.task_type,
    task_index=cluster_resolver.task_id,
    protocol=cluster_resolver.rpc_layer or "grpc",
    start=True)
server.join()

Xử lý lỗi tác vụ

Công nhân thất bại

tf.distribute.experimental.coordinator.ClusterCoordinator hoặc Model.fit cung cấp khả năng chịu lỗi tích hợp đối với lỗi của nhân viên. Sau khi phục hồi công nhân, chức năng tập dữ liệu đã cung cấp trước đó (tới ClusterCoordinator.create_per_worker_dataset cho vòng lặp đào tạo tùy chỉnh hoặc tf.keras.utils.experimental.DatasetCreator cho Model.fit ) sẽ được gọi trên công nhân để tạo lại tập dữ liệu.

Máy chủ tham số hoặc bộ điều phối không thành công

Tuy nhiên, khi điều phối viên thấy lỗi máy chủ tham số, nó sẽ xuất hiện lỗi UnavailableError hoặc AbortedError ngay lập tức. Bạn có thể khởi động lại bộ điều phối trong trường hợp này. Bản thân điều phối viên cũng có thể trở nên không khả dụng. Do đó, nên sử dụng một số công cụ nhất định để không làm mất tiến độ đào tạo:

  • Đối với Model.fit , bạn nên sử dụng lệnh gọi lại BackupAndRestore để xử lý quá trình lưu và khôi phục tiến trình tự động. Xem phần Gọi lại và đào tạo ở trên để biết ví dụ.

  • Đối với vòng lặp đào tạo tùy chỉnh, bạn nên kiểm tra các biến mô hình theo định kỳ và tải các biến mô hình từ một điểm kiểm tra, nếu có, trước khi bắt đầu đào tạo. Tiến trình đào tạo có thể được suy ra gần đúng từ optimizer.iterations nếu trình tối ưu hóa được kiểm tra:

checkpoint_manager = tf.train.CheckpointManager(
    tf.train.Checkpoint(model=model, optimizer=optimizer),
    checkpoint_dir,
    max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
  checkpoint = checkpoint_manager.checkpoint
  checkpoint.restore(
      checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()

global_steps = int(optimizer.iterations.numpy())
starting_epoch = global_steps // steps_per_epoch

for _ in range(starting_epoch, num_epoches):
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  coordinator.join()
  checkpoint_manager.save()

Tìm nạp RemoteValue

Việc tìm nạp một RemoteValue được đảm bảo sẽ thành công nếu một hàm được thực thi thành công. Điều này là do hiện tại giá trị trả về được sao chép ngay lập tức tới bộ điều phối sau khi một hàm được thực thi. Nếu có bất kỳ lỗi worker nào trong quá trình sao chép, chức năng sẽ được thử lại trên một worker hiện có khác. Do đó, nếu bạn muốn tối ưu hóa hiệu suất, bạn có thể lập lịch các hàm mà không có giá trị trả về.

Báo cáo lỗi

Khi điều phối viên gặp lỗi như UnavailableError từ các máy chủ tham số hoặc các lỗi ứng dụng khác chẳng hạn như Lỗi đối số không hợp tf.debugging.check_numerics InvalidArgument điều phối viên sẽ hủy tất cả các chức năng đang chờ xử lý và xếp hàng đợi trước khi phát sinh lỗi. Tìm nạp các RemoteValue tương ứng của chúng sẽ gây ra lỗi CancelledError .

Sau khi một lỗi được nêu ra, điều phối viên sẽ không đưa ra cùng một lỗi hoặc bất kỳ lỗi nào từ các chức năng bị hủy.

Cải thiện hiệu suất

Có một số lý do có thể xảy ra nếu bạn gặp vấn đề về hiệu suất khi đào tạo với ParameterServerStrategyClusterResolver .

Một lý do phổ biến là máy chủ tham số có tải không cân bằng và một số máy chủ tham số được tải nặng đã đạt đến dung lượng. Cũng có thể có nhiều nguyên nhân gốc rễ. Một số phương pháp đơn giản để giảm thiểu vấn đề này là:

  1. Chia nhỏ các biến mô hình lớn của bạn thông qua việc chỉ định một variable_partitioner khi xây dựng một ParameterServerStrategy .
  2. Tránh tạo biến điểm phát sóng được yêu cầu bởi tất cả các máy chủ tham số trong một bước duy nhất nếu có thể. Ví dụ: sử dụng tốc độ học không đổi hoặc lớp con tf.keras.optimizers.schedules.LearningRateSchedule trong trình tối ưu hóa vì hành vi mặc định là tốc độ học tập sẽ trở thành một biến được đặt trên một máy chủ tham số cụ thể và được yêu cầu bởi tất cả các máy chủ tham số khác trong mỗi bước .
  3. Xáo trộn các từ vựng lớn của bạn trước khi chuyển chúng đến các lớp tiền xử lý của Keras.

Một lý do khác có thể cho các vấn đề về hiệu suất là người điều phối. Việc triển khai schedule / join đầu tiên của bạn dựa trên Python và do đó có thể có chi phí phân luồng. Ngoài ra, độ trễ giữa điều phối viên và công nhân có thể lớn. Nếu đây là trường hợp,

  • Đối với Model.fit , bạn có thể đặt đối số steps_per_execution được cung cấp tại Model.compile thành giá trị lớn hơn 1.

  • Đối với một vòng lặp đào tạo tùy chỉnh, bạn có thể đóng gói nhiều bước vào một tf.function . function duy nhất:

steps_per_invocation = 10

@tf.function
def step_fn(iterator):
  for _ in range(steps_per_invocation):
    features, labels = next(iterator)
    def replica_fn(features, labels):
      ...

    strategy.run(replica_fn, args=(features, labels))

Vì thư viện được tối ưu hóa hơn nữa, hy vọng rằng hầu hết người dùng sẽ không phải đóng gói các bước theo cách thủ công trong tương lai.

Ngoài ra, một mẹo nhỏ để cải thiện hiệu suất là lên lịch cho các hàm không có giá trị trả về như đã giải thích trong phần xử lý lỗi tác vụ ở trên.

Những hạn chế đã biết

Hầu hết các hạn chế đã biết đã được đề cập trong các phần trên. Phần này cung cấp một bản tóm tắt.

ParameterServerStrategy chung

  • os.environment["grpc_fail_fast"]="use_caller" là cần thiết cho mọi tác vụ bao gồm cả điều phối viên, để làm cho khả năng chịu lỗi hoạt động bình thường.
  • Đào tạo máy chủ tham số đồng bộ không được hỗ trợ.
  • Thông thường cần phải đóng gói nhiều bước vào một chức năng duy nhất để đạt được hiệu suất tối ưu.
  • Nó không được hỗ trợ để tải một save_model qua tf.saved_model.load có chứa các biến phân đoạn. Lưu ý rằng việc tải một save_model như vậy bằng cách sử dụng TensorFlow Serving dự kiến ​​sẽ hoạt động.
  • Nó không được hỗ trợ để tải một điểm kiểm tra có chứa các biến vị trí trình tối ưu hóa phân đoạn vào một số lượng phân đoạn khác nhau.
  • Nó không được hỗ trợ để khôi phục sau lỗi máy chủ tham số mà không khởi động lại tác vụ điều phối viên.
  • Việc sử dụng tf.lookup.StaticHashTable (thường được sử dụng bởi một số lớp tiền xử lý Keras, chẳng hạn như tf.keras.layers.IntegerLookup , tf.keras.layers.StringLookuptf.keras.layers.TextVectorization ) dẫn đến tài nguyên được đặt trên điều phối viên tại thời điểm này với đào tạo máy chủ tham số. Điều này có ý nghĩa về hiệu suất đối với các RPC tra cứu từ công nhân đến điều phối viên. Đây là một ưu tiên cao hiện nay cần giải quyết.

Thông tin chi tiết cụ thể Model.fit

  • Đối số steps_per_epoch là bắt buộc trong Model.fit . Bạn có thể chọn một giá trị cung cấp các khoảng thời gian thích hợp trong một kỷ nguyên.
  • ParameterServerStrategy không hỗ trợ các lệnh gọi lại tùy chỉnh có các lệnh gọi cấp hàng loạt vì lý do hiệu suất. Bạn nên chuyển đổi các cuộc gọi đó thành các cuộc gọi cấp kỷ nguyên với các steps_per_epoch được chọn phù hợp, để chúng được gọi là mỗi số steps_per_epoch . Các lệnh gọi lại tích hợp không bị ảnh hưởng: các lệnh gọi cấp độ hàng loạt của chúng đã được sửa đổi để hoạt động tốt. Hỗ trợ các cuộc gọi cấp lô cho ParameterServerStrategy đang được lên kế hoạch.
  • Vì lý do tương tự, không giống như các chiến lược khác, thanh tiến trình và số liệu chỉ được ghi lại ở ranh giới kỷ nguyên.
  • run_eagerly không được hỗ trợ.

Thông tin cụ thể về vòng đào tạo tùy chỉnh