Trả lời cho sự kiện TensorFlow Everywhere tại địa phương của bạn ngay hôm nay!
Trang này được dịch bởi Cloud Translation API.
Switch to English

Đào tạo nhiều nhân viên với Keras

Xem trên TensorFlow.org Chạy trong Google Colab Xem nguồn trên GitHub Tải xuống sổ tay

Tổng quat

Hướng dẫn này trình bày đào tạo phân tán đa nhân viên với mô hình tf.distribute.Strategy sử dụng API tf.distribute.Strategy , cụ thể là tf.distribute.MultiWorkerMirroredStrategy . Với sự trợ giúp của chiến lược này, mô hình Keras được thiết kế để chạy trên một nhân viên có thể làm việc liền mạch trên nhiều nhân viên với sự thay đổi mã tối thiểu.

Hướng dẫn Đào tạo Phân tán trong TensorFlow có sẵn để có cái nhìn tổng quan về các chiến lược phân phối mà TensorFlow hỗ trợ cho những người quan tâm đến việc hiểu sâu hơn về các API tf.distribute.Strategy .

Thiết lập

Đầu tiên, một số nhập khẩu cần thiết.

import json
import os
import sys

Trước khi nhập TensorFlow, hãy thực hiện một vài thay đổi đối với môi trường.

Tắt tất cả các GPU. Điều này ngăn ngừa các lỗi gây ra bởi tất cả các công nhân đang cố gắng sử dụng cùng một GPU. Đối với một ứng dụng thực, mỗi công nhân sẽ ở trên một máy khác nhau.

os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

Đặt lại biến môi trường TF_CONFIG , bạn sẽ thấy thêm về điều này sau.

os.environ.pop('TF_CONFIG', None)

Đảm bảo rằng thư mục hiện tại nằm trên đường dẫn của python. Điều này cho phép sổ ghi chép nhập các tệp được ghi bởi %%writefile sau này.

if '.' not in sys.path:
  sys.path.insert(0, '.')

Bây giờ nhập TensorFlow.

import tensorflow as tf

Tập dữ liệu và định nghĩa mô hình

Tiếp theo, tạo tệp mnist.py với thiết lập mô hình và tập dữ liệu đơn giản. Tệp python này sẽ được sử dụng bởi worker-process trong hướng dẫn này:

%%writefile mnist.py

import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the range [0, 255].
  # You need to convert them to float32 with values in the range [0, 1]
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
  return train_dataset

def build_and_compile_cnn_model():
  model = tf.keras.Sequential([
      tf.keras.Input(shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
      metrics=['accuracy'])
  return model
Writing mnist.py

Hãy thử đào tạo mô hình cho một số ít kỷ nguyên và quan sát kết quả của một công nhân để đảm bảo mọi thứ hoạt động chính xác. Khi quá trình đào tạo tiến triển, tổn thất sẽ giảm xuống và độ chính xác sẽ tăng lên.

import mnist

batch_size = 64
single_worker_dataset = mnist.mnist_dataset(batch_size)
single_worker_model = mnist.build_and_compile_cnn_model()
single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
11493376/11490434 [==============================] - 0s 0us/step
Epoch 1/3
70/70 [==============================] - 1s 13ms/step - loss: 2.2959 - accuracy: 0.0977
Epoch 2/3
70/70 [==============================] - 1s 12ms/step - loss: 2.2311 - accuracy: 0.2726
Epoch 3/3
70/70 [==============================] - 1s 13ms/step - loss: 2.1668 - accuracy: 0.4236

<tensorflow.python.keras.callbacks.History at 0x7f62c6ec0780>

Cấu hình nhiều nhân viên

Bây giờ chúng ta hãy bước vào thế giới đào tạo nhiều nhân viên. Trong TensorFlow, biến môi trường TF_CONFIG là bắt buộc để huấn luyện trên nhiều máy, mỗi máy có thể có một vai trò khác nhau. TF_CONFIG là một chuỗi JSON được sử dụng để chỉ định cấu hình cụm trên mỗi worker là một phần của cụm.

Đây là một cấu hình ví dụ:

tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

Đây là cùng một TF_CONFIG được tuần tự hóa dưới dạng chuỗi JSON:

json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'

Có hai thành phần của TF_CONFIG : clustertask .

  • cluster giống nhau đối với tất cả công nhân và cung cấp thông tin về nhóm đào tạo, là một mệnh lệnh bao gồm các loại công việc khác nhau như worker . Trong quá trình đào tạo nhiều nhân viên với MultiWorkerMirroredStrategy , thường có một worker đảm nhận nhiều trách nhiệm hơn một chút như lưu điểm kiểm tra và viết tệp tóm tắt cho TensorBoard ngoài những gì một worker thông thường làm. Một nhân viên như vậy được gọi là chief công nhân, và nó là phong tục mà worker với index 0 được bổ nhiệm làm giám đốc worker (trên thực tế đây là cách tf.distribute.Strategy được thực hiện).

  • task cung cấp thông tin của task hiện tại và khác nhau trên từng worker. Nó chỉ định typeindex của công nhân đó.

Trong ví dụ này, bạn đặt type nhiệm vụ thành "worker"index nhiệm vụ thành 0 . Máy này là công nhân đầu tiên và sẽ được chỉ định làm công nhân chính và làm nhiều việc hơn các máy khác. Lưu ý rằng các máy khác cũng sẽ cần phải đặt biến môi trường TF_CONFIG và nó phải có cùng một cluster , nhưng type tác vụ hoặc index tác vụ khác nhau tùy thuộc vào vai trò của các máy đó.

Với mục đích minh họa, hướng dẫn này cho thấy cách một người có thể đặt một TF_CONFIG với 2 công nhân trên localhost . Trên thực tế, người dùng sẽ tạo nhiều công nhân trên các địa chỉ / cổng IP bên ngoài và đặt TF_CONFIG trên mỗi công nhân một cách thích hợp.

Trong ví dụ này, bạn sẽ sử dụng 2 công nhân, TF_CONFIG của công nhân đầu tiên được hiển thị ở trên. Đối với công nhân thứ hai, bạn sẽ đặt tf_config['task']['index']=1

Ở trên, tf_config chỉ là một biến cục bộ trong python. Để thực sự sử dụng nó để định cấu hình đào tạo, từ điển này cần được tuần tự hóa dưới dạng JSON và được đặt trong biến môi trường TF_CONFIG .

Biến môi trường và quy trình con trong sổ ghi chép

Các quy trình con kế thừa các biến môi trường từ cha của chúng. Vì vậy, nếu bạn đặt một biến môi trường trong quy trình jupyter notebook này:

os.environ['GREETINGS'] = 'Hello TensorFlow!'

Bạn có thể truy cập biến môi trường từ một quy trình con:

echo ${GREETINGS}
Hello TensorFlow!

Trong phần tiếp theo, bạn sẽ sử dụng điều này để chuyển TF_CONFIG cho các quy trình con của worker. Bạn sẽ không bao giờ thực sự khởi động công việc của mình theo cách này, nhưng nó đủ cho các mục đích của hướng dẫn này: Để minh họa một ví dụ tối thiểu về nhiều nhân viên.

Chọn chiến lược phù hợp

Trong TensorFlow có hai hình thức đào tạo phân tán chính:

  • Đào tạo đồng bộ, trong đó các bước đào tạo được đồng bộ hóa giữa các công nhân và bản sao, và
  • Đào tạo không đồng bộ, trong đó các bước đào tạo không được đồng bộ hóa nghiêm ngặt.

MultiWorkerMirroredStrategy , là chiến lược được khuyến nghị để đào tạo đồng bộ nhiều nhân viên, sẽ được trình bày trong hướng dẫn này. Để đào tạo mô hình, hãy sử dụng một phiên bản của tf.distribute.MultiWorkerMirroredStrategy .

MultiWorkerMirroredStrategy tạo bản sao của tất cả các biến trong các lớp của mô hình trên mỗi thiết bị trên tất cả các công nhân. Nó sử dụng CollectiveOps , một tùy chọn TensorFlow để giao tiếp tập thể, để tổng hợp các gradient và giữ cho các biến được đồng bộ hóa. Hướng dẫn tf.distribute.Strategy có thêm chi tiết về chiến lược này.

strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/device:CPU:0',)
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO

MultiWorkerMirroredStrategy cung cấp nhiều triển khai thông qua tham số CommunicationOptions . RING triển khai các tập thể dựa trên vòng sử dụng gRPC làm lớp giao tiếp giữa các máy chủ. NCCL sử dụng NCCL của Nvidia để thực hiện các tập thể. AUTO định nghĩa sự lựa chọn trong thời gian chạy. Sự lựa chọn tốt nhất của việc triển khai tập thể phụ thuộc vào số lượng và loại GPU cũng như kết nối mạng trong cụm.

Đào tạo mô hình

Với việc tích hợp API tf.distribute.Strategy vào tf.keras , thay đổi duy nhất bạn sẽ thực hiện để phân phối đào tạo cho nhiều nhân viên là bao gồm việc xây dựng mô hình và cuộc gọi model.compile() bên trong strategy.scope() model.compile() strategy.scope() . Phạm vi của chiến lược phân phối quy định cách thức và vị trí các biến được tạo, và trong trường hợp của MultiWorkerMirroredStrategy , các biến được tạo là MirroredVariable s và chúng được sao chép trên từng công nhân.

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist.build_and_compile_cnn_model()

Để thực sự chạy với MultiWorkerMirroredStrategy bạn sẽ cần chạy các quy trình công nhân và chuyển TF_CONFIG cho chúng.

Giống như tệp mnist.py được viết trước đó, đây là main.py mà mỗi công nhân sẽ chạy:

%%writefile main.py

import os
import json

import tensorflow as tf
import mnist

per_worker_batch_size = 64
tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])

strategy = tf.distribute.MultiWorkerMirroredStrategy()

global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist.mnist_dataset(global_batch_size)

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist.build_and_compile_cnn_model()


multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)
Writing main.py

Trong đoạn mã ở trên, lưu ý rằng global_batch_size , được chuyển đến Dataset.batch , được đặt thành per_worker_batch_size * num_workers . Điều này đảm bảo rằng mỗi công nhân xử lý hàng loạt ví dụ per_worker_batch_size bất kể số lượng công nhân.

Thư mục hiện tại chứa cả hai tệp Python:

ls *.py
main.py
mnist.py

Vì vậy, json-serialize TF_CONFIG và thêm nó vào các biến môi trường:

os.environ['TF_CONFIG'] = json.dumps(tf_config)

Bây giờ, bạn có thể khởi chạy một quy trình công nhân sẽ chạy main.py và sử dụng TF_CONFIG :

# first kill any previous runs
%killbgscripts
All background processes were killed.

python main.py &> job_0.log

Có một số điều cần lưu ý về lệnh trên:

  1. Nó sử dụng %%bash là một "phép thuật" sổ ghi chép để chạy một số lệnh bash.
  2. Nó sử dụng cờ --bg để chạy quá trình bash trong nền, bởi vì công nhân này sẽ không kết thúc. Nó đợi tất cả công nhân trước khi bắt đầu.

Quá trình công nhân nền sẽ không in kết quả ra sổ tay này, vì vậy &> chuyển hướng đầu ra của nó thành một tệp, vì vậy bạn có thể xem điều gì đã xảy ra.

Vì vậy, hãy đợi vài giây để quá trình bắt đầu:

import time
time.sleep(10)

Bây giờ hãy xem những gì đã được xuất ra logfile của worker cho đến nay:

cat job_0.log
2021-02-23 02:20:33.706454: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
2021-02-23 02:20:35.270749: I tensorflow/compiler/jit/xla_cpu_device.cc:41] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-02-23 02:20:35.271660: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcuda.so.1
2021-02-23 02:20:36.222960: E tensorflow/stream_executor/cuda/cuda_driver.cc:328] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-02-23 02:20:36.223030: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-692030523
2021-02-23 02:20:36.223039: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-692030523
2021-02-23 02:20:36.223151: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 450.51.5
2021-02-23 02:20:36.223184: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 450.51.5
2021-02-23 02:20:36.223191: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 450.51.5
2021-02-23 02:20:36.224026: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-02-23 02:20:36.224371: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-02-23 02:20:36.224902: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-02-23 02:20:36.228825: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:301] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-02-23 02:20:36.229255: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:411] Started server with target: grpc://localhost:12345

Dòng cuối cùng của tệp nhật ký sẽ có nội dung: Started server with target: grpc://localhost:12345 . Công nhân đầu tiên hiện đã sẵn sàng và đang đợi tất cả (các) công nhân khác sẵn sàng để tiếp tục.

Vì vậy, hãy cập nhật tf_config cho quy trình của worker thứ hai để nhận:

tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

Bây giờ khởi chạy công nhân thứ hai. Điều này sẽ bắt đầu đào tạo vì tất cả công nhân đều đang hoạt động (vì vậy không cần phải làm nền cho quá trình này):

python main.py
Epoch 1/3
70/70 [==============================] - 6s 51ms/step - loss: 2.2830 - accuracy: 0.1437
Epoch 2/3
70/70 [==============================] - 3s 48ms/step - loss: 2.2478 - accuracy: 0.2122
Epoch 3/3
70/70 [==============================] - 3s 48ms/step - loss: 2.2050 - accuracy: 0.3511

2021-02-23 02:20:43.794926: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
2021-02-23 02:20:45.375845: I tensorflow/compiler/jit/xla_cpu_device.cc:41] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-02-23 02:20:45.376779: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcuda.so.1
2021-02-23 02:20:46.347650: E tensorflow/stream_executor/cuda/cuda_driver.cc:328] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-02-23 02:20:46.347716: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-692030523
2021-02-23 02:20:46.347726: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-692030523
2021-02-23 02:20:46.347847: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 450.51.5
2021-02-23 02:20:46.347887: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 450.51.5
2021-02-23 02:20:46.347898: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 450.51.5
2021-02-23 02:20:46.348715: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-02-23 02:20:46.349096: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-02-23 02:20:46.349700: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-02-23 02:20:46.353497: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:301] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-02-23 02:20:46.353936: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:411] Started server with target: grpc://localhost:23456
2021-02-23 02:20:47.285814: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:656] In AUTO-mode, and switching to DATA-based sharding, instead of FILE-based sharding as we cannot find appropriate reader dataset op(s) to shard. Error: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}

2021-02-23 02:20:47.507974: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:116] None of the MLIR optimization passes are enabled (registered 2)
2021-02-23 02:20:47.508360: I tensorflow/core/platform/profile_utils/cpu_utils.cc:112] CPU Frequency: 2000140000 Hz

Bây giờ nếu bạn kiểm tra lại nhật ký được viết bởi công nhân đầu tiên, bạn sẽ thấy rằng nó đã tham gia đào tạo mô hình đó:

cat job_0.log
2021-02-23 02:20:33.706454: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
2021-02-23 02:20:35.270749: I tensorflow/compiler/jit/xla_cpu_device.cc:41] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-02-23 02:20:35.271660: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcuda.so.1
2021-02-23 02:20:36.222960: E tensorflow/stream_executor/cuda/cuda_driver.cc:328] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-02-23 02:20:36.223030: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-692030523
2021-02-23 02:20:36.223039: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-692030523
2021-02-23 02:20:36.223151: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 450.51.5
2021-02-23 02:20:36.223184: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 450.51.5
2021-02-23 02:20:36.223191: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 450.51.5
2021-02-23 02:20:36.224026: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-02-23 02:20:36.224371: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-02-23 02:20:36.224902: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-02-23 02:20:36.228825: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:301] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-02-23 02:20:36.229255: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:411] Started server with target: grpc://localhost:12345
2021-02-23 02:20:47.286117: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:656] In AUTO-mode, and switching to DATA-based sharding, instead of FILE-based sharding as we cannot find appropriate reader dataset op(s) to shard. Error: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}

2021-02-23 02:20:47.508657: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:116] None of the MLIR optimization passes are enabled (registered 2)
2021-02-23 02:20:47.508964: I tensorflow/core/platform/profile_utils/cpu_utils.cc:112] CPU Frequency: 2000140000 Hz
Epoch 1/3
70/70 [==============================] - 6s 51ms/step - loss: 2.2830 - accuracy: 0.1437
Epoch 2/3
70/70 [==============================] - 3s 48ms/step - loss: 2.2478 - accuracy: 0.2122
Epoch 3/3
70/70 [==============================] - 3s 48ms/step - loss: 2.2050 - accuracy: 0.3511

Không có gì ngạc nhiên khi điều này chạy chậm hơn so với quá trình chạy thử nghiệm ở phần đầu của hướng dẫn này. Chạy nhiều công nhân trên một máy chỉ làm tăng thêm chi phí. Mục tiêu ở đây không phải là cải thiện thời gian đào tạo, mà chỉ đưa ra một ví dụ về đào tạo nhiều công nhân.

# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.

Đào tạo chuyên sâu cho nhiều công nhân

Cho đến nay, hướng dẫn này đã chứng minh một thiết lập đa nhân viên cơ bản. Phần còn lại của tài liệu này xem xét chi tiết các yếu tố khác có thể hữu ích hoặc quan trọng cho các trường hợp sử dụng thực tế.

Sharding tập dữ liệu

Trong quá trình đào tạo nhiều nhân viên, phân tích tập dữ liệu là cần thiết để đảm bảo sự hội tụ và hiệu suất.

Ví dụ trong phần trước dựa trên tự động sạc mặc định được cung cấp bởi API tf.distribute.Strategy . Bạn có thể kiểm soát sharding bằng cách đặt tf.data.experimental.AutoShardPolicy của tf.data.experimental.DistributeOptions . Để tìm hiểu thêm về tính năng tự động làm sắc nét, hãy xem hướng dẫn Nhập phân tán .

Dưới đây là một ví dụ nhanh về cách TẮT tính năng tự động mài sắc nét, vì vậy mỗi bản sao sẽ xử lý mọi ví dụ (không được khuyến nghị):

options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF

global_batch_size = 64
multi_worker_dataset = mnist.mnist_dataset(batch_size=64)
dataset_no_auto_shard = multi_worker_dataset.with_options(options)

Đánh giá

Nếu bạn chuyển validation_data vào model.fit , nó sẽ luân phiên giữa đào tạo và đánh giá cho mỗi kỷ nguyên. Đánh giá lấy dữ liệu validation_data được phân phối trên cùng một nhóm công nhân và kết quả đánh giá được tổng hợp và có sẵn cho tất cả công nhân. Tương tự như đào tạo, tập dữ liệu xác thực được tự động chia nhỏ ở cấp tệp. Bạn cần đặt kích thước lô chung trong tập dữ liệu xác thực và đặt bước validation_steps . Một tập dữ liệu lặp lại cũng được khuyến nghị để đánh giá.

Ngoài ra, bạn cũng có thể tạo một tác vụ khác định kỳ đọc các điểm kiểm tra và chạy đánh giá. Đây là những gì Công cụ ước tính làm. Nhưng đây không phải là cách được khuyến nghị để thực hiện đánh giá và do đó các chi tiết của nó bị bỏ qua.

Hiệu suất

Bây giờ bạn có một mô hình Keras đã được thiết lập để chạy trong nhiều công nhân với MultiWorkerMirroredStrategy . Bạn có thể thử các kỹ thuật sau để điều chỉnh hiệu suất của đào tạo nhiều nhân viên với MultiWorkerMirroredStrategy .

  • MultiWorkerMirroredStrategy cung cấp nhiều triển khai giao tiếp tập thể . RING triển khai các tập thể dựa trên vòng sử dụng gRPC làm lớp giao tiếp giữa các máy chủ. NCCL sử dụng NCCL của Nvidia để thực hiện các tập thể. AUTO xác định sự lựa chọn trong thời gian chạy. Lựa chọn tốt nhất của việc triển khai tập thể phụ thuộc vào số lượng và loại GPU cũng như kết nối mạng trong cụm. Để ghi đè lựa chọn tự động, chỉ định communication_options tham số của MultiWorkerMirroredStrategy constructor 's, ví dụ như communication_options=tf.distribute.experimental.CommunicationOptions(implementation=tf.distribute.experimental.CollectiveCommunication.NCCL) .
  • Truyền các biến tới tf.float nếu có thể. Mô hình ResNet chính thức bao gồm một ví dụ về cách có thể thực hiện điều này.

Khả năng chịu lỗi

Trong đào tạo đồng bộ, cụm sẽ thất bại nếu một trong các công nhân bị lỗi và không tồn tại cơ chế khắc phục lỗi. Sử dụng Keras với tf.distribute.Strategy có lợi thế về khả năng chịu lỗi trong trường hợp công nhân chết hoặc không ổn định. Bạn thực hiện việc này bằng cách duy trì trạng thái đào tạo trong hệ thống tệp phân tán mà bạn chọn, sao cho khi khởi động lại phiên bản mà trước đó không thành công hoặc được sử dụng trước, trạng thái đào tạo được phục hồi.

Khi một nhân viên không có mặt, những nhân viên khác sẽ không thực hiện được (có thể sau khi hết thời gian chờ). Trong những trường hợp như vậy, cần khởi động lại công nhân không khả dụng, cũng như các công nhân khác đã bị lỗi.

Gọi lại ModelCheckpoint

ModelCheckpoint gọi lại ModelCheckpoint không còn cung cấp chức năng chịu lỗi nữa, thay vào đó hãy sử dụng BackupAndRestore gọi lại BackupAndRestore .

ModelCheckpoint gọi lại ModelCheckpoint vẫn có thể được sử dụng để lưu các điểm kiểm tra. Nhưng với điều này, nếu quá trình đào tạo bị gián đoạn hoặc kết thúc thành công, để tiếp tục đào tạo từ điểm kiểm tra, người dùng có trách nhiệm tải mô hình theo cách thủ công.

Tùy ý người dùng có thể chọn lưu và khôi phục mô hình / trọng số bên ngoài ModelCheckpoint gọi lại ModelCheckpoint .

Lưu và tải mô hình

Để lưu mô hình của bạn bằng cách sử dụng model.save hoặc tf.saved_model.save , đích để lưu cần phải khác nhau đối với mỗi nhân viên. Đối với những người không phải là công nhân chính, bạn sẽ cần lưu mô hình vào một thư mục tạm thời và trên công nhân trưởng, bạn sẽ cần lưu vào thư mục mô hình được cung cấp. Các thư mục tạm thời trên worker cần phải là duy nhất để tránh lỗi do nhiều worker đang cố gắng ghi vào cùng một vị trí. Mô hình được lưu trong tất cả các thư mục là giống nhau và thường chỉ mô hình được lưu bởi trưởng mới được tham chiếu để khôi phục hoặc phục vụ. Bạn nên có một số logic dọn dẹp để xóa các thư mục tạm thời được tạo bởi công nhân khi quá trình đào tạo của bạn đã hoàn thành.

Lý do bạn cần đồng thời tiết kiệm trưởng và công nhân là vì bạn có thể tổng hợp các biến trong quá trình kiểm tra, điều này yêu cầu cả trưởng và công nhân tham gia vào giao thức giao tiếp allreduce. Mặt khác, việc để trưởng và công nhân lưu vào cùng một thư mục mô hình sẽ dẫn đến sai sót do tranh chấp.

Với MultiWorkerMirroredStrategy , chương trình được chạy trên mọi worker và để biết liệu worker hiện tại có phải là trưởng hay không, nó tận dụng đối tượng trình phân giải cụm có các thuộc tính task_typetask_id . task_type cho bạn biết công việc hiện tại là gì (ví dụ: 'worker') và task_id cho bạn biết mã định danh của worker. Công nhân có id 0 được chỉ định là công nhân chính.

Trong đoạn mã bên dưới, write_filepath cung cấp đường dẫn tệp để ghi, đường dẫn này phụ thuộc vào id công nhân. Trong trường hợp của trưởng (công nhân có id 0), nó ghi vào đường dẫn tệp gốc; đối với những người khác, nó tạo một thư mục tạm thời (có id trong đường dẫn thư mục) để ghi vào:

model_path = '/tmp/keras-model'

def _is_chief(task_type, task_id):
  # Note: there are two possible `TF_CONFIG` configuration.
  #   1) In addition to `worker` tasks, a `chief` task type is use;
  #      in this case, this function should be modified to 
  #      `return task_type == 'chief'`.
  #   2) Only `worker` task type is used; in this case, worker 0 is
  #      regarded as the chief. The implementation demonstrated here
  #      is for this case.
  # For the purpose of this colab section, we also add `task_type is None` 
  # case because it is effectively run with only single worker.
  return (task_type == 'worker' and task_id == 0) or task_type is None

def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)
write_model_path = write_filepath(model_path, task_type, task_id)

Với điều đó, bây giờ bạn đã sẵn sàng để lưu:

multi_worker_model.save(write_model_path)
INFO:tensorflow:Assets written to: /tmp/keras-model/assets

INFO:tensorflow:Assets written to: /tmp/keras-model/assets

Như đã mô tả ở trên, về sau, mô hình chỉ nên được tải từ đường dẫn trưởng được lưu vào, vì vậy hãy xóa các mô hình tạm thời mà không phải nhân viên chính đã lưu:

if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(os.path.dirname(write_model_path))

Bây giờ, khi đã đến lúc tải, hãy sử dụng API tf.keras.models.load_model tiện lợi và tiếp tục với công việc tiếp theo. Ở đây, giả sử chỉ sử dụng một worker duy nhất để tải và tiếp tục đào tạo, trong trường hợp đó, bạn không gọi tf.keras.models.load_model trong một strategy.scope() tf.keras.models.load_model strategy.scope() .

loaded_model = tf.keras.models.load_model(model_path)

# Now that the model is restored, and can continue with the training.
loaded_model.fit(single_worker_dataset, epochs=2, steps_per_epoch=20)
Epoch 1/2
20/20 [==============================] - 1s 13ms/step - loss: 2.3041 - accuracy: 7.8125e-04
Epoch 2/2
20/20 [==============================] - 0s 12ms/step - loss: 2.2873 - accuracy: 0.0023

<tensorflow.python.keras.callbacks.History at 0x7f62c4ef5048>

Lưu và khôi phục điểm kiểm tra

Mặt khác, kiểm tra cho phép bạn lưu trọng lượng của mô hình và khôi phục chúng mà không cần phải lưu toàn bộ mô hình. Tại đây, bạn sẽ tạo một tf.train.Checkpoint theo dõi mô hình, được quản lý bởi tf.train.CheckpointManager để chỉ có điểm kiểm tra mới nhất được giữ nguyên.

checkpoint_dir = '/tmp/ckpt'

checkpoint = tf.train.Checkpoint(model=multi_worker_model)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id)
checkpoint_manager = tf.train.CheckpointManager(
    checkpoint, directory=write_checkpoint_dir, max_to_keep=1)

Sau khi thiết lập CheckpointManager , bạn đã sẵn sàng lưu và xóa các trạm kiểm soát không phải là nhân viên chính đã lưu.

checkpoint_manager.save()
if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(write_checkpoint_dir)

Bây giờ, khi bạn cần khôi phục, bạn có thể tìm thấy điểm kiểm tra mới nhất được lưu bằng chức năng tf.train.latest_checkpoint tiện lợi. Sau khi khôi phục điểm kiểm tra, bạn có thể tiếp tục đào tạo.

latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
checkpoint.restore(latest_checkpoint)
multi_worker_model.fit(multi_worker_dataset, epochs=2, steps_per_epoch=20)
Epoch 1/2
20/20 [==============================] - 3s 13ms/step - loss: 2.3050 - accuracy: 0.0920
Epoch 2/2
20/20 [==============================] - 0s 12ms/step - loss: 2.2963 - accuracy: 0.0896

<tensorflow.python.keras.callbacks.History at 0x7f62c44a2710>

Gọi lại BackupAndRestore

Gọi lại BackupAndRestore cung cấp chức năng chịu lỗi, bằng cách sao lưu mô hình và số kỷ nguyên hiện tại trong một tệp điểm kiểm tra tạm thời dưới đối số backup_dir cho BackupAndRestore . Điều này được thực hiện vào cuối mỗi kỷ nguyên.

Khi công việc bị gián đoạn và khởi động lại, lệnh gọi lại khôi phục điểm kiểm tra cuối cùng và quá trình đào tạo tiếp tục từ đầu kỷ nguyên bị gián đoạn. Bất kỳ quá trình đào tạo từng phần nào đã được thực hiện trong kỷ nguyên chưa hoàn thành trước khi bị gián đoạn sẽ bị loại bỏ, để nó không ảnh hưởng đến trạng thái mô hình cuối cùng.

Để sử dụng nó, hãy cung cấp một phiên bản của tf.keras.callbacks.experimental.BackupAndRestore tại lệnh tf.keras.Model.fit() .

Với MultiWorkerMirroredStrategy, nếu một worker bị gián đoạn, toàn bộ cụm sẽ tạm dừng cho đến khi khởi động lại worker bị gián đoạn. Các công nhân khác cũng sẽ khởi động lại và công nhân bị gián đoạn tham gia lại cụm. Sau đó, mọi công nhân đọc tệp điểm kiểm tra đã được lưu trước đó và chọn trạng thái cũ của nó, do đó cho phép cụm đồng bộ trở lại. Sau đó, việc đào tạo tiếp tục.

BackupAndRestore lại BackupAndRestore sử dụng CheckpointManager để lưu và khôi phục trạng thái đào tạo, tạo một tệp có tên là điểm kiểm tra theo dõi các điểm kiểm tra hiện có cùng với điểm kiểm tra mới nhất. Vì lý do này, backup_dir không nên được sử dụng lại để lưu trữ các trạm kiểm soát khác nhằm tránh xung đột tên.

Hiện tại, gọi lại BackupAndRestore hỗ trợ một nhân viên không có chiến lược, MirroredStrategy và nhiều nhân viên với MultiWorkerMirroredStrategy. Dưới đây là hai ví dụ cho cả đào tạo nhiều công nhân và đào tạo một công nhân.

# Multi-worker training with MultiWorkerMirroredStrategy.

callbacks = [tf.keras.callbacks.experimental.BackupAndRestore(backup_dir='/tmp/backup')]
with strategy.scope():
  multi_worker_model = mnist.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
                       epochs=3,
                       steps_per_epoch=70,
                       callbacks=callbacks)
Epoch 1/3
70/70 [==============================] - 4s 13ms/step - loss: 2.2930 - accuracy: 0.1316
Epoch 2/3
70/70 [==============================] - 1s 13ms/step - loss: 2.2467 - accuracy: 0.2765
Epoch 3/3
70/70 [==============================] - 1s 13ms/step - loss: 2.1963 - accuracy: 0.3645

<tensorflow.python.keras.callbacks.History at 0x7f62c4371390>

Nếu bạn kiểm tra thư mục backup_dir mà bạn đã chỉ định trong BackupAndRestore , bạn có thể nhận thấy một số tệp điểm kiểm tra được tạo tạm thời. Các tệp đó cần thiết để khôi phục các phiên bản đã mất trước đó và chúng sẽ bị thư viện xóa vào cuối tf.keras.Model.fit() khi thoát thành công khóa đào tạo của bạn.

Xem thêm

  1. Hướng dẫn Đào tạo Phân tán trong TensorFlow cung cấp tổng quan về các chiến lược phân phối có sẵn.
  2. Các mô hình chính thức , nhiều mô hình trong số đó có thể được định cấu hình để chạy nhiều chiến lược phân phối.
  3. Phần Hiệu suất trong hướng dẫn cung cấp thông tin về các chiến lược và công cụ khác mà bạn có thể sử dụng để tối ưu hóa hiệu suất của các mô hình TensorFlow của mình.