Đào tạo nhiều nhân viên với Công cụ ước tính

Xem trên TensorFlow.org Chạy trong Google Colab Xem nguồn trên GitHub Tải xuống sổ ghi chép

Tổng quat

Hướng dẫn này trình bày cách sử dụng tf.distribute.Strategy để đào tạo nhiều nhân viên phân tán với tf.estimator . Nếu bạn viết mã của mình bằng tf.estimator và bạn quan tâm đến việc mở rộng ra ngoài một máy duy nhất với hiệu suất cao, thì hướng dẫn này là dành cho bạn.

Trước khi bắt đầu, vui lòng đọc hướng dẫn chiến lược phân phối . Hướng dẫn đào tạo đa GPU cũng có liên quan, vì hướng dẫn này sử dụng cùng một mô hình.

Thành lập

Đầu tiên, thiết lập TensorFlow và các lần nhập cần thiết.

import tensorflow_datasets as tfds
import tensorflow as tf

import os, json
tf.compat.v1.disable_eager_execution()

Chức năng đầu vào

Hướng dẫn này sử dụng tập dữ liệu MNIST từ TensorFlow Datasets . Đoạn mã ở đây tương tự như hướng dẫn đào tạo đa GPU với một điểm khác biệt chính: khi sử dụng Công cụ ước tính để đào tạo nhiều nhân viên, cần phải chia nhỏ tập dữ liệu theo số lượng nhân viên để đảm bảo mô hình hội tụ. Dữ liệu đầu vào được phân đoạn theo chỉ mục worker, để mỗi worker xử lý 1/num_workers các phần riêng biệt của tập dữ liệu.

BUFFER_SIZE = 10000
BATCH_SIZE = 64

def input_fn(mode, input_context=None):
  datasets, info = tfds.load(name='mnist',
                                with_info=True,
                                as_supervised=True)
  mnist_dataset = (datasets['train'] if mode == tf.estimator.ModeKeys.TRAIN else
                   datasets['test'])

  def scale(image, label):
    image = tf.cast(image, tf.float32)
    image /= 255
    return image, label

  if input_context:
    mnist_dataset = mnist_dataset.shard(input_context.num_input_pipelines,
                                        input_context.input_pipeline_id)
  return mnist_dataset.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)

Một cách tiếp cận hợp lý khác để đạt được sự hội tụ là xáo trộn tập dữ liệu với các hạt giống riêng biệt ở mỗi công nhân.

Cấu hình nhiều nhân viên

Một trong những điểm khác biệt chính trong hướng dẫn này (so với hướng dẫn đào tạo đa GPU ) là thiết lập nhiều nhân viên. Biến môi trường TF_CONFIG là cách tiêu chuẩn để chỉ định cấu hình cụm cho mỗi worker là một phần của cụm.

Có hai thành phần của TF_CONFIG : clustertask . cluster cung cấp thông tin về toàn bộ cluster, cụ thể là các worker và các máy chủ tham số trong cluster. task cung cấp thông tin về nhiệm vụ hiện tại. cluster thành phần đầu tiên giống nhau đối với tất cả công nhân và máy chủ tham số trong cụm và task thành phần thứ hai khác nhau trên mỗi công nhân và máy chủ tham số và chỉ định typeindex riêng của nó. Trong ví dụ này, type tác vụ là workerindex tác vụ là 0 .

Với mục đích minh họa, hướng dẫn này chỉ ra cách đặt TF_CONFIG với 2 worker trên localhost . Trong thực tế, bạn sẽ tạo nhiều công nhân trên một địa chỉ IP và cổng bên ngoài, đồng thời đặt TF_CONFIG trên mỗi công nhân một cách thích hợp, tức là sửa đổi index nhiệm vụ.

os.environ['TF_CONFIG'] = json.dumps({
    'cluster': {
        'worker': ["localhost:12345", "localhost:23456"]
    },
    'task': {'type': 'worker', 'index': 0}
})

Xác định mô hình

Viết các lớp, trình tối ưu hóa và hàm mất để đào tạo. Hướng dẫn này xác định mô hình bằng các lớp Keras, tương tự như hướng dẫn đào tạo đa GPU .

LEARNING_RATE = 1e-4
def model_fn(features, labels, mode):
  model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(64, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  logits = model(features, training=False)

  if mode == tf.estimator.ModeKeys.PREDICT:
    predictions = {'logits': logits}
    return tf.estimator.EstimatorSpec(labels=labels, predictions=predictions)

  optimizer = tf.compat.v1.train.GradientDescentOptimizer(
      learning_rate=LEARNING_RATE)
  loss = tf.keras.losses.SparseCategoricalCrossentropy(
      from_logits=True, reduction=tf.keras.losses.Reduction.NONE)(labels, logits)
  loss = tf.reduce_sum(loss) * (1. / BATCH_SIZE)
  if mode == tf.estimator.ModeKeys.EVAL:
    return tf.estimator.EstimatorSpec(mode, loss=loss)

  return tf.estimator.EstimatorSpec(
      mode=mode,
      loss=loss,
      train_op=optimizer.minimize(
          loss, tf.compat.v1.train.get_or_create_global_step()))

MultiWorkerMirroredStrategy

Để đào tạo mô hình, hãy sử dụng một phiên bản của tf.distribute.experimental.MultiWorkerMirroredStrategy . MultiWorkerMirroredStrategy tạo bản sao của tất cả các biến trong các lớp của mô hình trên mỗi thiết bị trên tất cả các công nhân. Nó sử dụng CollectiveOps , một tùy chọn TensorFlow để giao tiếp tập thể, để tổng hợp các gradient và giữ cho các biến được đồng bộ hóa. Hướng dẫn tf.distribute.Strategy có thêm chi tiết về chiến lược này.

strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
WARNING:tensorflow:From /tmp/ipykernel_7505/349189047.py:1: _CollectiveAllReduceStrategyExperimental.__init__ (from tensorflow.python.distribute.collective_all_reduce_strategy) is deprecated and will be removed in a future version.
Instructions for updating:
use distribute.MultiWorkerMirroredStrategy instead
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CommunicationImplementation.AUTO

Đào tạo và đánh giá mô hình

Tiếp theo, chỉ định chiến lược phân phối trong RunConfig cho công cụ ước tính, đào tạo và đánh giá bằng cách gọi tf.estimator.train_and_evaluate . Hướng dẫn này chỉ phân phối đào tạo bằng cách chỉ định chiến lược qua train_distribute . Cũng có thể phân phối đánh giá thông qua eval_distribute .

config = tf.estimator.RunConfig(train_distribute=strategy)

classifier = tf.estimator.Estimator(
    model_fn=model_fn, model_dir='/tmp/multiworker', config=config)
tf.estimator.train_and_evaluate(
    classifier,
    train_spec=tf.estimator.TrainSpec(input_fn=input_fn),
    eval_spec=tf.estimator.EvalSpec(input_fn=input_fn)
)
INFO:tensorflow:Initializing RunConfig with distribution strategies.
INFO:tensorflow:Not using Distribute Coordinator.
INFO:tensorflow:Using config: {'_model_dir': '/tmp/multiworker', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': <tensorflow.python.distribute.collective_all_reduce_strategy._CollectiveAllReduceStrategyExperimental object at 0x7f3404234490>, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_checkpoint_save_graph_def': True, '_service': None, '_cluster_spec': ClusterSpec({}), '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1, '_distribute_coordinator_mode': None}
INFO:tensorflow:Not using Distribute Coordinator.
INFO:tensorflow:Running training and evaluation locally (non-distributed).
INFO:tensorflow:Start train and evaluate loop. The evaluate will happen after every checkpoint. Checkpoint frequency is determined based on RunConfig arguments: save_checkpoints_steps None or save_checkpoints_secs 600.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_estimator/python/estimator/estimator.py:1244: StrategyBase.configure (from tensorflow.python.distribute.distribute_lib) is deprecated and will be removed in a future version.
Instructions for updating:
use `update_config_proto` instead.
INFO:tensorflow:The `input_fn` accepts an `input_context` which will be given by DistributionStrategy
INFO:tensorflow:Calling model_fn.
/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/data/ops/dataset_ops.py:449: UserWarning: To make it possible to preserve tf.data options across serialization boundaries, their implementation has moved to be part of the TensorFlow graph. As a consequence, the options value is in general no longer known at graph construction time. Invoking this method in graph mode retains the legacy behavior of the original implementation, but note that the returned value might not reflect the actual value of the options.
  warnings.warn("To make it possible to preserve tf.data options across "
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Create CheckpointSaverHook.
INFO:tensorflow:Create CheckpointSaverHook.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_estimator/python/estimator/util.py:95: DistributedIteratorV1.initialize (from tensorflow.python.distribute.v1.input_lib) is deprecated and will be removed in a future version.
Instructions for updating:
Use the iterator's `initializer` property instead.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_estimator/python/estimator/util.py:95: DistributedIteratorV1.initialize (from tensorflow.python.distribute.v1.input_lib) is deprecated and will be removed in a future version.
Instructions for updating:
Use the iterator's `initializer` property instead.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...
INFO:tensorflow:Saving checkpoints for 0 into /tmp/multiworker/model.ckpt.
INFO:tensorflow:Saving checkpoints for 0 into /tmp/multiworker/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...
2022-01-26 05:29:43.503603: W tensorflow/core/grappler/utils/graph_view.cc:836] No registered 'MultiDeviceIteratorFromStringHandle' OpKernel for GPU devices compatible with node { {node MultiDeviceIteratorFromStringHandle} }
    .  Registered:  device='CPU'

2022-01-26 05:29:43.504873: W tensorflow/core/grappler/utils/graph_view.cc:836] No registered 'MultiDeviceIteratorGetNextFromShard' OpKernel for GPU devices compatible with node { {node MultiDeviceIteratorGetNextFromShard} }
    .  Registered:  device='CPU'
INFO:tensorflow:loss = 2.292878, step = 0
INFO:tensorflow:loss = 2.292878, step = 0
INFO:tensorflow:global_step/sec: 173.275
INFO:tensorflow:global_step/sec: 173.275
INFO:tensorflow:loss = 2.29561, step = 100 (0.579 sec)
INFO:tensorflow:loss = 2.29561, step = 100 (0.579 sec)
INFO:tensorflow:global_step/sec: 189.057
INFO:tensorflow:global_step/sec: 189.057
INFO:tensorflow:loss = 2.2644367, step = 200 (0.529 sec)
INFO:tensorflow:loss = 2.2644367, step = 200 (0.529 sec)
INFO:tensorflow:global_step/sec: 193.075
INFO:tensorflow:global_step/sec: 193.075
INFO:tensorflow:loss = 2.2662685, step = 300 (0.517 sec)
INFO:tensorflow:loss = 2.2662685, step = 300 (0.517 sec)
INFO:tensorflow:global_step/sec: 199.957
INFO:tensorflow:global_step/sec: 199.957
INFO:tensorflow:loss = 2.2667098, step = 400 (0.500 sec)
INFO:tensorflow:loss = 2.2667098, step = 400 (0.500 sec)
INFO:tensorflow:global_step/sec: 204.217
INFO:tensorflow:global_step/sec: 204.217
INFO:tensorflow:loss = 2.251912, step = 500 (0.490 sec)
INFO:tensorflow:loss = 2.251912, step = 500 (0.490 sec)
INFO:tensorflow:global_step/sec: 201.747
INFO:tensorflow:global_step/sec: 201.747
INFO:tensorflow:loss = 2.2633677, step = 600 (0.496 sec)
INFO:tensorflow:loss = 2.2633677, step = 600 (0.496 sec)
INFO:tensorflow:global_step/sec: 206.079
INFO:tensorflow:global_step/sec: 206.079
INFO:tensorflow:loss = 2.2531767, step = 700 (0.485 sec)
INFO:tensorflow:loss = 2.2531767, step = 700 (0.485 sec)
INFO:tensorflow:global_step/sec: 231.299
INFO:tensorflow:global_step/sec: 231.299
INFO:tensorflow:loss = 2.2578738, step = 800 (0.433 sec)
INFO:tensorflow:loss = 2.2578738, step = 800 (0.433 sec)
INFO:tensorflow:global_step/sec: 657.044
INFO:tensorflow:global_step/sec: 657.044
INFO:tensorflow:loss = 2.2344787, step = 900 (0.150 sec)
INFO:tensorflow:loss = 2.2344787, step = 900 (0.150 sec)
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 938...
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 938...
INFO:tensorflow:Saving checkpoints for 938 into /tmp/multiworker/model.ckpt.
INFO:tensorflow:Saving checkpoints for 938 into /tmp/multiworker/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 938...
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 938...
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Starting evaluation at 2022-01-26T05:29:56
INFO:tensorflow:Starting evaluation at 2022-01-26T05:29:56
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Restoring parameters from /tmp/multiworker/model.ckpt-938
INFO:tensorflow:Restoring parameters from /tmp/multiworker/model.ckpt-938
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Evaluation [10/100]
INFO:tensorflow:Evaluation [10/100]
INFO:tensorflow:Evaluation [20/100]
INFO:tensorflow:Evaluation [20/100]
INFO:tensorflow:Evaluation [30/100]
INFO:tensorflow:Evaluation [30/100]
INFO:tensorflow:Evaluation [40/100]
INFO:tensorflow:Evaluation [40/100]
INFO:tensorflow:Evaluation [50/100]
INFO:tensorflow:Evaluation [50/100]
INFO:tensorflow:Evaluation [60/100]
INFO:tensorflow:Evaluation [60/100]
INFO:tensorflow:Evaluation [70/100]
INFO:tensorflow:Evaluation [70/100]
INFO:tensorflow:Evaluation [80/100]
INFO:tensorflow:Evaluation [80/100]
INFO:tensorflow:Evaluation [90/100]
INFO:tensorflow:Evaluation [90/100]
INFO:tensorflow:Evaluation [100/100]
INFO:tensorflow:Evaluation [100/100]
INFO:tensorflow:Inference Time : 2.04637s
INFO:tensorflow:Inference Time : 2.04637s
INFO:tensorflow:Finished evaluation at 2022-01-26-05:29:58
INFO:tensorflow:Finished evaluation at 2022-01-26-05:29:58
INFO:tensorflow:Saving dict for global step 938: global_step = 938, loss = 2.234131
INFO:tensorflow:Saving dict for global step 938: global_step = 938, loss = 2.234131
INFO:tensorflow:Saving 'checkpoint_path' summary for global step 938: /tmp/multiworker/model.ckpt-938
INFO:tensorflow:Saving 'checkpoint_path' summary for global step 938: /tmp/multiworker/model.ckpt-938
INFO:tensorflow:Loss for final step: 1.10881.
INFO:tensorflow:Loss for final step: 1.10881.
({'loss': 2.234131, 'global_step': 938}, [])

Tối ưu hóa hiệu suất đào tạo

Bây giờ bạn có một mô hình và một Công cụ ước tính có nhiều nhân viên được hỗ trợ bởi tf.distribute.Strategy . Bạn có thể thử các kỹ thuật sau để tối ưu hóa hiệu suất của đào tạo nhiều nhân viên:

  • Tăng kích thước lô: Kích thước lô được chỉ định ở đây là trên mỗi GPU. Nói chung, kích thước lô lớn nhất phù hợp với bộ nhớ GPU được khuyến khích.
  • Truyền biến: Truyền các biến tới tf.float nếu có thể. Mô hình ResNet chính thức bao gồm một ví dụ về cách có thể thực hiện điều này.
  • Sử dụng giao tiếp tập thể: MultiWorkerMirroredStrategy cung cấp nhiều triển khai giao tiếp tập thể .

    • RING triển khai các tập thể dựa trên vòng sử dụng gRPC làm lớp giao tiếp giữa các máy chủ.
    • NCCL sử dụng NCCL của Nvidia để thực hiện các tập thể.
    • AUTO định nghĩa sự lựa chọn trong thời gian chạy.

    Sự lựa chọn tốt nhất của việc triển khai tập thể phụ thuộc vào số lượng và loại GPU cũng như kết nối mạng trong cụm. Để ghi đè lựa chọn tự động, hãy chỉ định một giá trị hợp lệ cho tham số communication của phương thức MultiWorkerMirroredStrategy , ví dụ: communication=tf.distribute.experimental.CollectiveCommunication.NCCL .

Truy cập phần Hiệu suất trong hướng dẫn để tìm hiểu thêm về các chiến lược và công cụ khác mà bạn có thể sử dụng để tối ưu hóa hiệu suất của các mô hình TensorFlow của mình.

Các ví dụ mã khác

  1. Ví dụ từ đầu đến cuối để đào tạo nhiều nhân viên trong tensorflow / hệ sinh thái bằng cách sử dụng các mẫu Kubernetes. Ví dụ này bắt đầu với mô hình Keras và chuyển đổi nó thành Công cụ ước tính bằng cách sử dụng API tf.keras.estimator.model_to_estimator .
  2. Các mô hình chính thức , nhiều mô hình trong số đó có thể được định cấu hình để chạy nhiều chiến lược phân phối.