Có một câu hỏi? Kết nối với cộng đồng tại Diễn đàn Truy cập Diễn đàn TensorFlow

Đào tạo nhiều nhân viên với Công cụ ước tính

Xem trên TensorFlow.org Chạy trong Google Colab Xem nguồn trên GitHub Tải xuống sổ ghi chép

Tổng quat

Hướng dẫn này trình bày cách sử dụng tf.distribute.Strategy để đào tạo nhiều nhân viên phân tán với tf.estimator . Nếu bạn viết mã của mình bằng tf.estimator và bạn quan tâm đến việc mở rộng ra ngoài một máy duy nhất với hiệu suất cao, thì hướng dẫn này là dành cho bạn.

Trước khi bắt đầu, vui lòng đọc hướng dẫn chiến lược phân phối . Hướng dẫn đào tạo đa GPU cũng có liên quan, vì hướng dẫn này sử dụng cùng một mô hình.

Thiết lập

Đầu tiên, hãy thiết lập TensorFlow và các lần nhập cần thiết.

import tensorflow_datasets as tfds
import tensorflow as tf

import os, json
tf.compat.v1.disable_eager_execution()

Chức năng đầu vào

Hướng dẫn này sử dụng tập dữ liệu MNIST từ Tập dữ liệu TensorFlow . Mã ở đây tương tự như hướng dẫn đào tạo đa GPU với một điểm khác biệt chính: khi sử dụng Công cụ ước tính để đào tạo nhiều nhân viên, cần phải chia nhỏ tập dữ liệu theo số lượng công nhân để đảm bảo sự hội tụ của mô hình. Dữ liệu đầu vào được phân đoạn theo chỉ mục worker, để mỗi worker xử lý 1/num_workers các phần riêng biệt của tập dữ liệu.

BUFFER_SIZE = 10000
BATCH_SIZE = 64

def input_fn(mode, input_context=None):
  datasets, info = tfds.load(name='mnist',
                                with_info=True,
                                as_supervised=True)
  mnist_dataset = (datasets['train'] if mode == tf.estimator.ModeKeys.TRAIN else
                   datasets['test'])

  def scale(image, label):
    image = tf.cast(image, tf.float32)
    image /= 255
    return image, label

  if input_context:
    mnist_dataset = mnist_dataset.shard(input_context.num_input_pipelines,
                                        input_context.input_pipeline_id)
  return mnist_dataset.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)

Một cách tiếp cận hợp lý khác để đạt được sự hội tụ là xáo trộn tập dữ liệu với các hạt giống riêng biệt ở mỗi công nhân.

Cấu hình nhiều nhân viên

Một trong những điểm khác biệt chính trong hướng dẫn này (so với hướng dẫn đào tạo đa GPU ) là thiết lập nhiều nhân viên. TF_CONFIG môi trường TF_CONFIG là cách tiêu chuẩn để chỉ định cấu hình cụm cho mỗi worker là một phần của cụm.

Có hai thành phần của TF_CONFIG : clustertask . cluster cung cấp thông tin về toàn bộ cluster, cụ thể là các worker và các máy chủ tham số trong cluster. task cung cấp thông tin về nhiệm vụ hiện tại. cluster thành phần đầu tiên giống nhau đối với tất cả công nhân và máy chủ tham số trong cụm và task thành phần thứ hai khác nhau trên mỗi công nhân và máy chủ tham số và chỉ định typeindex riêng của nó. Trong ví dụ này, type tác vụ là workerindex tác vụ là 0 .

Với mục đích minh họa, hướng dẫn này chỉ ra cách đặt TF_CONFIG với 2 TF_CONFIG trên localhost . Trong thực tế, bạn sẽ tạo nhiều công nhân trên một địa chỉ IP và cổng bên ngoài, đồng thời đặt TF_CONFIG trên mỗi công nhân một cách thích hợp, tức là sửa đổi index nhiệm vụ.

os.environ['TF_CONFIG'] = json.dumps({
    'cluster': {
        'worker': ["localhost:12345", "localhost:23456"]
    },
    'task': {'type': 'worker', 'index': 0}
})

Xác định mô hình

Viết các lớp, trình tối ưu hóa và hàm mất để đào tạo. Hướng dẫn này xác định mô hình bằng các lớp Keras, tương tự như hướng dẫn đào tạo đa GPU .

LEARNING_RATE = 1e-4
def model_fn(features, labels, mode):
  model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(64, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  logits = model(features, training=False)

  if mode == tf.estimator.ModeKeys.PREDICT:
    predictions = {'logits': logits}
    return tf.estimator.EstimatorSpec(labels=labels, predictions=predictions)

  optimizer = tf.compat.v1.train.GradientDescentOptimizer(
      learning_rate=LEARNING_RATE)
  loss = tf.keras.losses.SparseCategoricalCrossentropy(
      from_logits=True, reduction=tf.keras.losses.Reduction.NONE)(labels, logits)
  loss = tf.reduce_sum(loss) * (1. / BATCH_SIZE)
  if mode == tf.estimator.ModeKeys.EVAL:
    return tf.estimator.EstimatorSpec(mode, loss=loss)

  return tf.estimator.EstimatorSpec(
      mode=mode,
      loss=loss,
      train_op=optimizer.minimize(
          loss, tf.compat.v1.train.get_or_create_global_step()))

MultiWorkerMirroredStrategy

Để đào tạo mô hình, hãy sử dụng một phiên bản của tf.distribute.experimental.MultiWorkerMirroredStrategy . MultiWorkerMirroredStrategy tạo bản sao của tất cả các biến trong các lớp của mô hình trên mỗi thiết bị trên tất cả các công nhân. Nó sử dụng CollectiveOps , một tùy chọn TensorFlow để giao tiếp tập thể, để tổng hợp các gradient và giữ cho các biến được đồng bộ hóa. Hướng dẫn tf.distribute.Strategy có thêm chi tiết về chiến lược này.

strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
WARNING:tensorflow:From <ipython-input-1-f1f424df316e>:1: _CollectiveAllReduceStrategyExperimental.__init__ (from tensorflow.python.distribute.collective_all_reduce_strategy) is deprecated and will be removed in a future version.
Instructions for updating:
use distribute.MultiWorkerMirroredStrategy instead
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CommunicationImplementation.AUTO

Đào tạo và đánh giá mô hình

Tiếp theo, chỉ định chiến lược phân phối trong RunConfig cho công cụ ước tính, đào tạo và đánh giá bằng cách gọi tf.estimator.train_and_evaluate . Hướng dẫn này chỉ phân phối đào tạo bằng cách chỉ định chiến lược qua train_distribute . Cũng có thể phân phối đánh giá thông qua eval_distribute .

config = tf.estimator.RunConfig(train_distribute=strategy)

classifier = tf.estimator.Estimator(
    model_fn=model_fn, model_dir='/tmp/multiworker', config=config)
tf.estimator.train_and_evaluate(
    classifier,
    train_spec=tf.estimator.TrainSpec(input_fn=input_fn),
    eval_spec=tf.estimator.EvalSpec(input_fn=input_fn)
)
INFO:tensorflow:Initializing RunConfig with distribution strategies.
INFO:tensorflow:Not using Distribute Coordinator.
INFO:tensorflow:Using config: {'_model_dir': '/tmp/multiworker', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': <tensorflow.python.distribute.collective_all_reduce_strategy._CollectiveAllReduceStrategyExperimental object at 0x7eff5c6afe50>, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_checkpoint_save_graph_def': True, '_service': None, '_cluster_spec': ClusterSpec({}), '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1, '_distribute_coordinator_mode': None}
INFO:tensorflow:Not using Distribute Coordinator.
INFO:tensorflow:Running training and evaluation locally (non-distributed).
INFO:tensorflow:Start train and evaluate loop. The evaluate will happen after every checkpoint. Checkpoint frequency is determined based on RunConfig arguments: save_checkpoints_steps None or save_checkpoints_secs 600.
INFO:tensorflow:The `input_fn` accepts an `input_context` which will be given by DistributionStrategy
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Create CheckpointSaverHook.
INFO:tensorflow:Create CheckpointSaverHook.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_estimator/python/estimator/util.py:96: DistributedIteratorV1.initialize (from tensorflow.python.distribute.input_lib) is deprecated and will be removed in a future version.
Instructions for updating:
Use the iterator's `initializer` property instead.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_estimator/python/estimator/util.py:96: DistributedIteratorV1.initialize (from tensorflow.python.distribute.input_lib) is deprecated and will be removed in a future version.
Instructions for updating:
Use the iterator's `initializer` property instead.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...
INFO:tensorflow:Saving checkpoints for 0 into /tmp/multiworker/model.ckpt.
INFO:tensorflow:Saving checkpoints for 0 into /tmp/multiworker/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...
INFO:tensorflow:loss = 2.305574, step = 0
INFO:tensorflow:loss = 2.305574, step = 0
INFO:tensorflow:global_step/sec: 208.558
INFO:tensorflow:global_step/sec: 208.558
INFO:tensorflow:loss = 2.3069654, step = 100 (0.482 sec)
INFO:tensorflow:loss = 2.3069654, step = 100 (0.482 sec)
INFO:tensorflow:global_step/sec: 217.154
INFO:tensorflow:global_step/sec: 217.154
INFO:tensorflow:loss = 2.2992344, step = 200 (0.460 sec)
INFO:tensorflow:loss = 2.2992344, step = 200 (0.460 sec)
INFO:tensorflow:global_step/sec: 226.569
INFO:tensorflow:global_step/sec: 226.569
INFO:tensorflow:loss = 2.2939656, step = 300 (0.441 sec)
INFO:tensorflow:loss = 2.2939656, step = 300 (0.441 sec)
INFO:tensorflow:global_step/sec: 308.967
INFO:tensorflow:global_step/sec: 308.967
INFO:tensorflow:loss = 2.3002355, step = 400 (0.324 sec)
INFO:tensorflow:loss = 2.3002355, step = 400 (0.324 sec)
INFO:tensorflow:global_step/sec: 316.892
INFO:tensorflow:global_step/sec: 316.892
INFO:tensorflow:loss = 2.3072734, step = 500 (0.318 sec)
INFO:tensorflow:loss = 2.3072734, step = 500 (0.318 sec)
INFO:tensorflow:global_step/sec: 303.073
INFO:tensorflow:global_step/sec: 303.073
INFO:tensorflow:loss = 2.2884116, step = 600 (0.328 sec)
INFO:tensorflow:loss = 2.2884116, step = 600 (0.328 sec)
INFO:tensorflow:global_step/sec: 318.734
INFO:tensorflow:global_step/sec: 318.734
INFO:tensorflow:loss = 2.2843719, step = 700 (0.313 sec)
INFO:tensorflow:loss = 2.2843719, step = 700 (0.313 sec)
INFO:tensorflow:global_step/sec: 347.186
INFO:tensorflow:global_step/sec: 347.186
INFO:tensorflow:loss = 2.2874813, step = 800 (0.287 sec)
INFO:tensorflow:loss = 2.2874813, step = 800 (0.287 sec)
INFO:tensorflow:global_step/sec: 664.977
INFO:tensorflow:global_step/sec: 664.977
INFO:tensorflow:loss = 2.259765, step = 900 (0.150 sec)
INFO:tensorflow:loss = 2.259765, step = 900 (0.150 sec)
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 938...
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 938...
INFO:tensorflow:Saving checkpoints for 938 into /tmp/multiworker/model.ckpt.
INFO:tensorflow:Saving checkpoints for 938 into /tmp/multiworker/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 938...
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 938...
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Starting evaluation at 2021-06-16T18:35:23
INFO:tensorflow:Starting evaluation at 2021-06-16T18:35:23
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Restoring parameters from /tmp/multiworker/model.ckpt-938
INFO:tensorflow:Restoring parameters from /tmp/multiworker/model.ckpt-938
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Evaluation [10/100]
INFO:tensorflow:Evaluation [10/100]
INFO:tensorflow:Evaluation [20/100]
INFO:tensorflow:Evaluation [20/100]
INFO:tensorflow:Evaluation [30/100]
INFO:tensorflow:Evaluation [30/100]
INFO:tensorflow:Evaluation [40/100]
INFO:tensorflow:Evaluation [40/100]
INFO:tensorflow:Evaluation [50/100]
INFO:tensorflow:Evaluation [50/100]
INFO:tensorflow:Evaluation [60/100]
INFO:tensorflow:Evaluation [60/100]
INFO:tensorflow:Evaluation [70/100]
INFO:tensorflow:Evaluation [70/100]
INFO:tensorflow:Evaluation [80/100]
INFO:tensorflow:Evaluation [80/100]
INFO:tensorflow:Evaluation [90/100]
INFO:tensorflow:Evaluation [90/100]
INFO:tensorflow:Evaluation [100/100]
INFO:tensorflow:Evaluation [100/100]
INFO:tensorflow:Inference Time : 1.37506s
INFO:tensorflow:Inference Time : 1.37506s
INFO:tensorflow:Finished evaluation at 2021-06-16-18:35:25
INFO:tensorflow:Finished evaluation at 2021-06-16-18:35:25
INFO:tensorflow:Saving dict for global step 938: global_step = 938, loss = 2.2780812
INFO:tensorflow:Saving dict for global step 938: global_step = 938, loss = 2.2780812
INFO:tensorflow:Saving 'checkpoint_path' summary for global step 938: /tmp/multiworker/model.ckpt-938
INFO:tensorflow:Saving 'checkpoint_path' summary for global step 938: /tmp/multiworker/model.ckpt-938
INFO:tensorflow:Loss for final step: 1.141045.
INFO:tensorflow:Loss for final step: 1.141045.
({'loss': 2.2780812, 'global_step': 938}, [])

Tối ưu hóa hiệu suất đào tạo

Bây giờ bạn có một mô hình và một Công cụ ước tính có nhiều nhân viên được hỗ trợ bởi tf.distribute.Strategy . Bạn có thể thử các kỹ thuật sau để tối ưu hóa hiệu suất của đào tạo nhiều nhân viên:

  • Tăng kích thước lô: Kích thước lô được chỉ định ở đây là trên mỗi GPU. Nói chung, kích thước lô lớn nhất phù hợp với bộ nhớ GPU được khuyến khích.
  • Truyền biến: Truyền các biến tới tf.float nếu có thể. Mô hình ResNet chính thức bao gồm một ví dụ về cách có thể thực hiện điều này.
  • Sử dụng giao tiếp tập thể: MultiWorkerMirroredStrategy cung cấp nhiều triển khai giao tiếp tập thể .

    • RING triển khai các tập thể dựa trên vòng sử dụng gRPC làm lớp giao tiếp giữa các máy chủ.
    • NCCL sử dụng NCCL của Nvidia để thực hiện các tập thể.
    • AUTO định nghĩa sự lựa chọn trong thời gian chạy.

    Sự lựa chọn tốt nhất của việc triển khai tập thể phụ thuộc vào số lượng và loại GPU cũng như kết nối mạng trong cụm. Để ghi đè lựa chọn tự động, hãy chỉ định một giá trị hợp lệ cho tham số communication của phương thức MultiWorkerMirroredStrategy , ví dụ: communication=tf.distribute.experimental.CollectiveCommunication.NCCL .

Truy cập phần Hiệu suất trong hướng dẫn để tìm hiểu thêm về các chiến lược và công cụ khác mà bạn có thể sử dụng để tối ưu hóa hiệu suất của các mô hình TensorFlow của mình.

Các ví dụ mã khác

  1. Ví dụ từ đầu đến cuối cho việc đào tạo nhiều nhân viên trong tensorflow / hệ sinh thái bằng cách sử dụng các mẫu Kubernetes. Ví dụ này bắt đầu với mô hình Keras và chuyển đổi nó thành Công cụ ước tính bằng cách sử dụng API tf.keras.estimator.model_to_estimator .
  2. Các mô hình chính thức , nhiều mô hình trong số đó có thể được định cấu hình để chạy nhiều chiến lược phân phối.