Đào tạo nhiều nhân viên với Keras

Xem trên TensorFlow.org Chạy trong Google Colab Xem nguồn trên GitHub Tải xuống sổ ghi chép

Tổng quat

Hướng dẫn này trình bày cách thực hiện đào tạo phân tán nhiều nhân viên với mô hình Keras và API Model.fit bằng cách sử dụng API tf.distribute.Strategy — cụ thể là lớp tf.distribute.MultiWorkerMirroredStrategy . Với sự trợ giúp của chiến lược này, mô hình Keras được thiết kế để chạy trên một nhân viên duy nhất có thể làm việc liền mạch trên nhiều nhân viên với những thay đổi mã tối thiểu.

Đối với những người quan tâm đến việc hiểu sâu hơn về các API tf.distribute.Strategy , chương trình đào tạo Phân tán trong hướng dẫn TensorFlow có sẵn để biết tổng quan về các chiến lược phân phối mà TensorFlow hỗ trợ.

Để tìm hiểu cách sử dụng MultiWorkerMirroredStrategy với Keras và vòng đào tạo tùy chỉnh, hãy tham khảo Vòng đào tạo tùy chỉnh với Keras và MultiWorkerMirroredStrategy .

Lưu ý rằng mục đích của hướng dẫn này là để chứng minh một ví dụ tối thiểu về nhiều công nhân với hai công nhân.

Thành lập

Bắt đầu với một số nhập cần thiết:

import json
import os
import sys

Trước khi nhập TensorFlow, hãy thực hiện một số thay đổi đối với môi trường:

  1. Tắt tất cả các GPU. Điều này ngăn ngừa các lỗi gây ra bởi tất cả các công nhân đang cố gắng sử dụng cùng một GPU. Trong một ứng dụng thế giới thực, mỗi công nhân sẽ làm việc trên một máy khác nhau.
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
  1. Đặt lại biến môi trường TF_CONFIG (bạn sẽ tìm hiểu thêm về điều này sau):
os.environ.pop('TF_CONFIG', None)
  1. Đảm bảo rằng thư mục hiện tại nằm trên đường dẫn của Python — điều này cho phép sổ ghi chép nhập các tệp được viết bởi %%writefile sau:
if '.' not in sys.path:
  sys.path.insert(0, '.')

Bây giờ nhập TensorFlow:

import tensorflow as tf

Tập dữ liệu và định nghĩa mô hình

Tiếp theo, tạo tệp mnist_setup.py với thiết lập mô hình và tập dữ liệu đơn giản. Tệp Python này sẽ được sử dụng bởi các quy trình công nhân trong hướng dẫn này:

%%writefile mnist_setup.py

import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the [0, 255] range.
  # You need to convert them to float32 with values in the [0, 1] range.
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
  return train_dataset

def build_and_compile_cnn_model():
  model = tf.keras.Sequential([
      tf.keras.layers.InputLayer(input_shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
      metrics=['accuracy'])
  return model
Writing mnist_setup.py

Đào tạo mô hình trên một công nhân duy nhất

Hãy thử đào tạo mô hình cho một số lượng nhỏ kỷ nguyên và quan sát kết quả của một công nhân để đảm bảo mọi thứ hoạt động chính xác. Khi quá trình đào tạo tiến triển, tổn thất sẽ giảm xuống và độ chính xác sẽ tăng lên.

import mnist_setup

batch_size = 64
single_worker_dataset = mnist_setup.mnist_dataset(batch_size)
single_worker_model = mnist_setup.build_and_compile_cnn_model()
single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
11493376/11490434 [==============================] - 0s 0us/step
11501568/11490434 [==============================] - 0s 0us/step
2022-02-05 02:20:59.945141: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
Epoch 1/3
70/70 [==============================] - 1s 12ms/step - loss: 2.2839 - accuracy: 0.1788
Epoch 2/3
70/70 [==============================] - 1s 12ms/step - loss: 2.2492 - accuracy: 0.3185
Epoch 3/3
70/70 [==============================] - 1s 12ms/step - loss: 2.2012 - accuracy: 0.4795
<keras.callbacks.History at 0x7f666a2e4510>

Cấu hình nhiều nhân viên

Bây giờ chúng ta hãy bước vào thế giới của đào tạo nhiều nhân viên.

Một cụm với các công việc và nhiệm vụ

Trong TensorFlow, đào tạo phân tán bao gồm: một 'cluster' với một số công việc và mỗi công việc có thể có một hoặc nhiều 'task' .

Bạn sẽ cần biến môi trường cấu hình TF_CONFIG để đào tạo trên nhiều máy, mỗi máy có thể có một vai trò khác nhau. TF_CONFIG là một chuỗi JSON được sử dụng để chỉ định cấu hình cụm cho mỗi worker là một phần của cụm.

Có hai thành phần của một biến TF_CONFIG : 'cluster''task' .

  • Một 'cluster' giống nhau đối với tất cả công nhân và cung cấp thông tin về nhóm đào tạo, là một mệnh lệnh bao gồm các loại công việc khác nhau, chẳng hạn như 'worker' hoặc 'chief' .

    • Trong đào tạo nhiều nhân viên với tf.distribute.MultiWorkerMirroredStrategy , thường có một 'worker' đảm nhận các trách nhiệm, chẳng hạn như lưu điểm kiểm tra và viết tệp tóm tắt cho TensorBoard, ngoài những việc mà một 'worker' thông thường làm. 'worker' đó được gọi là công nhân trưởng (với tên công việc là 'chief' ).
    • Theo thông lệ, 'chief' phải chỉ định 'index' 0 (trên thực tế, đây là cách thực hiện tf.distribute.Strategy ).
  • Một 'task' cung cấp thông tin về nhiệm vụ hiện tại và khác nhau đối với từng công nhân. Nó chỉ định 'type''index' của công nhân đó.

Dưới đây là một cấu hình ví dụ:

tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

Đây là cùng một TF_CONFIG được tuần tự hóa thành một chuỗi JSON:

json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'

Lưu ý rằng tf_config chỉ là một biến cục bộ trong Python. Để có thể sử dụng nó cho cấu hình đào tạo, lệnh này cần được tuần tự hóa dưới dạng JSON và được đặt trong một biến môi trường TF_CONFIG .

Trong cấu hình ví dụ ở trên, bạn đặt nhiệm vụ 'type' thành 'worker''index' nhiệm vụ thành 0 . Do đó, chiếc máy này là công nhân đầu tiên . Nó sẽ được bổ nhiệm làm công nhân 'chief' và làm nhiều việc hơn những người khác.

Với mục đích minh họa, hướng dẫn này chỉ ra cách bạn có thể thiết lập một biến TF_CONFIG với hai công nhân trên một localhost .

Trong thực tế, bạn sẽ tạo nhiều công nhân trên các địa chỉ / cổng IP bên ngoài và đặt biến TF_CONFIG trên mỗi công nhân tương ứng.

Trong hướng dẫn này, bạn sẽ sử dụng hai công nhân:

  • TF_CONFIG của công nhân đầu tiên ( 'chief' ) được hiển thị ở trên.
  • Đối với công nhân thứ hai, bạn sẽ đặt tf_config['task']['index']=1

Biến môi trường và quy trình con trong sổ ghi chép

Các quy trình con kế thừa các biến môi trường từ cha của chúng.

Ví dụ: bạn có thể đặt một biến môi trường trong quy trình Máy tính xách tay Jupyter này như sau:

os.environ['GREETINGS'] = 'Hello TensorFlow!'

Sau đó, bạn có thể truy cập biến môi trường từ một quy trình con:

echo ${GREETINGS}
Hello TensorFlow!

Trong phần tiếp theo, bạn sẽ sử dụng một phương pháp tương tự để chuyển TF_CONFIG đến các quy trình con của worker. Trong một tình huống thực tế, bạn sẽ không bắt đầu công việc của mình theo cách này, nhưng trong ví dụ này là đủ.

Chọn chiến lược phù hợp

Trong TensorFlow, có hai hình thức đào tạo phân tán chính:

  • Đào tạo đồng bộ , trong đó các bước đào tạo được đồng bộ hóa giữa các công nhân và bản sao, và
  • Đào tạo không đồng bộ , trong đó các bước đào tạo không được đồng bộ hóa nghiêm ngặt (ví dụ: đào tạo máy chủ tham số ).

Hướng dẫn này trình bày cách thực hiện đào tạo nhiều nhân viên đồng bộ bằng cách sử dụng một phiên bản của tf.distribute.MultiWorkerMirroredStrategy .

MultiWorkerMirroredStrategy tạo bản sao của tất cả các biến trong các lớp của mô hình trên mỗi thiết bị trên tất cả các công nhân. Nó sử dụng CollectiveOps , một tùy chọn TensorFlow để giao tiếp tập thể, để tổng hợp các gradient và giữ cho các biến được đồng bộ hóa. Hướng dẫn tf.distribute.Strategy có thêm chi tiết về chiến lược này.

strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO

MultiWorkerMirroredStrategy cung cấp nhiều triển khai thông qua tham số tf.distribute.experimental.CommunicationOptions : 1) RING triển khai các tập thể dựa trên vòng sử dụng gRPC làm lớp giao tiếp trên nhiều máy chủ; 2) NCCL sử dụng Thư viện liên lạc tập thể NVIDIA để thực hiện tập thể; và 3) AUTO định nghĩa sự lựa chọn trong thời gian chạy. Sự lựa chọn tốt nhất của việc triển khai tập thể phụ thuộc vào số lượng và loại GPU cũng như kết nối mạng trong cụm.

Đào tạo mô hình

Với việc tích hợp API tf.distribute.Strategy vào tf.keras , thay đổi duy nhất bạn sẽ thực hiện để phân phối đào tạo cho nhiều nhân viên là bao gồm việc xây dựng mô hình và cuộc gọi model.compile() bên trong strategy.scope() . Phạm vi của chiến lược phân phối quy định cách thức và vị trí các biến được tạo, và trong trường hợp của MultiWorkerMirroredStrategy , các biến được tạo là MirroredVariable s và chúng được sao chép trên từng công nhân.

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist_setup.build_and_compile_cnn_model()

Để thực sự chạy với MultiWorkerMirroredStrategy , bạn sẽ cần chạy các quy trình công nhân và chuyển TF_CONFIG cho chúng.

Giống như tệp mnist_setup.py được viết trước đó, đây là main.py mà mỗi công nhân sẽ chạy:

%%writefile main.py

import os
import json

import tensorflow as tf
import mnist_setup

per_worker_batch_size = 64
tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])

strategy = tf.distribute.MultiWorkerMirroredStrategy()

global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist_setup.mnist_dataset(global_batch_size)

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist_setup.build_and_compile_cnn_model()


multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)
Writing main.py

Trong đoạn mã trên, lưu ý rằng global_batch_size , được chuyển đến Dataset.batch , được đặt thành per_worker_batch_size * num_workers . Điều này đảm bảo rằng mỗi công nhân xử lý hàng loạt ví dụ per_worker_batch_size bất kể số lượng công nhân.

Thư mục hiện tại hiện chứa cả hai tệp Python:

ls *.py
main.py
mnist_setup.py

Vì vậy, json-serialize TF_CONFIG và thêm nó vào các biến môi trường:

os.environ['TF_CONFIG'] = json.dumps(tf_config)

Bây giờ, bạn có thể khởi chạy một quy trình công nhân sẽ chạy main.py và sử dụng TF_CONFIG :

# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log

Có một số điều cần lưu ý về lệnh trên:

  1. Nó sử dụng %%bash là một "phép thuật" sổ ghi chép để chạy một số lệnh bash.
  2. Nó sử dụng cờ --bg để chạy quá trình bash trong nền, bởi vì công nhân này sẽ không kết thúc. Nó đợi tất cả các công nhân trước khi nó bắt đầu.

Quá trình công nhân nền sẽ không in kết quả ra sổ ghi chép này, vì vậy &> chuyển hướng đầu ra của nó thành một tệp để bạn có thể kiểm tra những gì đã xảy ra trong tệp nhật ký sau này.

Vì vậy, hãy đợi vài giây để quá trình bắt đầu:

import time
time.sleep(10)

Bây giờ, hãy kiểm tra những gì đã được xuất vào tệp nhật ký của worker cho đến nay:

cat job_0.log
2022-02-05 02:21:06.348503: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected

Dòng cuối cùng của tệp nhật ký sẽ có nội dung: Started server with target: grpc://localhost:12345 . Công nhân đầu tiên hiện đã sẵn sàng và đang đợi tất cả (các) công nhân khác sẵn sàng để tiếp tục.

Vì vậy, hãy cập nhật tf_config cho quy trình của worker thứ hai để nhận:

tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

Khởi chạy công nhân thứ hai. Điều này sẽ bắt đầu đào tạo vì tất cả công nhân đều đang hoạt động (vì vậy không cần phải làm nền tảng cho quá trình này):

python main.py
Epoch 1/3
70/70 [==============================] - 6s 51ms/step - loss: 2.2766 - accuracy: 0.1722
Epoch 2/3
70/70 [==============================] - 3s 48ms/step - loss: 2.2172 - accuracy: 0.4157
Epoch 3/3
70/70 [==============================] - 3s 49ms/step - loss: 2.1471 - accuracy: 0.5901
2022-02-05 02:21:16.367945: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2022-02-05 02:21:17.234030: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:0"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}

2022-02-05 02:21:17.450972: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.

Nếu bạn kiểm tra lại nhật ký được viết bởi công nhân đầu tiên, bạn sẽ biết rằng nó đã tham gia đào tạo mô hình đó:

cat job_0.log
2022-02-05 02:21:06.348503: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2022-02-05 02:21:17.232316: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:0"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}

2022-02-05 02:21:17.457812: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Epoch 1/3
70/70 [==============================] - 6s 51ms/step - loss: 2.2766 - accuracy: 0.1722
Epoch 2/3
70/70 [==============================] - 3s 48ms/step - loss: 2.2172 - accuracy: 0.4157
Epoch 3/3
70/70 [==============================] - 3s 49ms/step - loss: 2.1471 - accuracy: 0.5901

Không có gì đáng ngạc nhiên, điều này chạy chậm hơn so với chạy thử nghiệm ở phần đầu của hướng dẫn này.

Chạy nhiều công nhân trên một máy chỉ làm tăng thêm chi phí.

Mục tiêu ở đây không phải là cải thiện thời gian đào tạo, mà chỉ đưa ra một ví dụ về đào tạo nhiều công nhân.

# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.

Đào tạo chuyên sâu cho nhiều nhân viên

Cho đến nay, bạn đã học cách thực hiện thiết lập nhiều nhân viên cơ bản.

Trong phần còn lại của hướng dẫn, bạn sẽ tìm hiểu chi tiết về các yếu tố khác, có thể hữu ích hoặc quan trọng đối với các trường hợp sử dụng thực tế.

Sharding tập dữ liệu

Trong đào tạo nhiều nhân viên, phân tích tập dữ liệu là cần thiết để đảm bảo sự hội tụ và hiệu suất.

Ví dụ trong phần trước dựa trên tự động sạc mặc định được cung cấp bởi API tf.distribute.Strategy . Bạn có thể kiểm soát sharding bằng cách đặt tf.data.experimental.AutoShardPolicy của tf.data.experimental.DistributeOptions .

Để tìm hiểu thêm về tính năng tự động làm sắc nét, hãy tham khảo Hướng dẫn nhập phân tán .

Dưới đây là một ví dụ nhanh về cách tắt chế độ tự động làm sắc nét để mỗi bản sao xử lý mọi ví dụ ( không được khuyến nghị ):

options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF

global_batch_size = 64
multi_worker_dataset = mnist_setup.mnist_dataset(batch_size=64)
dataset_no_auto_shard = multi_worker_dataset.with_options(options)

Đánh giá

Nếu bạn chuyển validation_data vào Model.fit , nó sẽ luân phiên giữa đào tạo và đánh giá cho mỗi kỷ nguyên. Đánh giá lấy dữ liệu validation_data được phân phối trên cùng một nhóm công nhân và kết quả đánh giá được tổng hợp và có sẵn cho tất cả công nhân.

Tương tự như đào tạo, tập dữ liệu xác thực được tự động chia nhỏ ở cấp tệp. Bạn cần đặt kích thước lô chung trong tập dữ liệu xác thực và đặt bước validation_steps .

Một tập dữ liệu lặp lại cũng được khuyến nghị để đánh giá.

Ngoài ra, bạn cũng có thể tạo một tác vụ khác định kỳ đọc các điểm kiểm tra và chạy đánh giá. Đây là những gì Công cụ ước tính làm. Nhưng đây không phải là cách được khuyến nghị để thực hiện đánh giá và do đó các chi tiết của nó bị bỏ qua.

Màn biểu diễn

Bây giờ bạn có một mô hình Keras đã được thiết lập để chạy trong nhiều nhân viên với MultiWorkerMirroredStrategy .

Để điều chỉnh hiệu suất của đào tạo nhiều nhân viên, bạn có thể thử các cách sau:

  • tf.distribute.MultiWorkerMirroredStrategy cung cấp nhiều triển khai giao tiếp tập thể :

    • RING triển khai các tập thể dựa trên vòng sử dụng gRPC làm lớp giao tiếp giữa các máy chủ.
    • NCCL sử dụng Thư viện liên lạc tập thể NVIDIA để thực hiện tập thể.
    • AUTO định nghĩa sự lựa chọn trong thời gian chạy.

    Sự lựa chọn tốt nhất của việc triển khai tập thể phụ thuộc vào số lượng GPU, loại GPU và kết nối mạng trong cụm. Để ghi đè lựa chọn tự động, hãy chỉ định tham số communication_options của phương thức khởi tạo của MultiWorkerMirroredStrategy . Ví dụ:

    communication_options=tf.distribute.experimental.CommunicationOptions(implementation=tf.distribute.experimental.CollectiveCommunication.NCCL)
    
  • Truyền các biến tới tf.float nếu có thể:

    • Mô hình ResNet chính thức bao gồm một ví dụ về cách có thể thực hiện điều này.

Khả năng chịu lỗi

Trong đào tạo đồng bộ, cụm sẽ thất bại nếu một trong các công nhân bị lỗi và không tồn tại cơ chế khắc phục lỗi.

Sử dụng Keras với tf.distribute.Strategy có lợi thế về khả năng chịu lỗi trong trường hợp công nhân chết hoặc không ổn định. Bạn có thể thực hiện việc này bằng cách duy trì trạng thái đào tạo trong hệ thống tệp phân tán mà bạn chọn, sao cho khi khởi động lại phiên bản mà trước đó không thành công hoặc đã sử dụng trước, trạng thái đào tạo được phục hồi.

Khi một nhân viên không có mặt, các nhân viên khác sẽ không thực hiện được (có thể sau khi hết thời gian chờ). Trong những trường hợp như vậy, cần khởi động lại công nhân không khả dụng, cũng như các công nhân khác đã bị lỗi.

Gọi lại ModelCheckpoint

Cuộc gọi lại ModelCheckpoint không còn cung cấp chức năng chịu lỗi nữa, hãy sử dụng lệnh gọi lại BackupAndRestore để thay thế.

Lệnh gọi lại ModelCheckpoint vẫn có thể được sử dụng để lưu các điểm kiểm tra. Nhưng với điều này, nếu quá trình đào tạo bị gián đoạn hoặc kết thúc thành công, để tiếp tục đào tạo từ điểm kiểm tra, người dùng có trách nhiệm tải mô hình theo cách thủ công.

Theo tùy chọn, người dùng có thể chọn lưu và khôi phục mô hình / trọng số bên ngoài lệnh gọi lại ModelCheckpoint .

Lưu và tải mô hình

Để lưu mô hình của bạn bằng cách sử dụng model.save hoặc tf.saved_model.save , đích lưu cần phải khác nhau đối với mỗi nhân viên.

  • Đối với những người không phải là công nhân chính, bạn sẽ cần phải lưu mô hình vào một thư mục tạm thời.
  • Đối với trưởng, bạn sẽ cần phải lưu vào thư mục mô hình được cung cấp.

Các thư mục tạm thời trên worker cần phải là duy nhất để tránh lỗi do nhiều worker cố gắng ghi vào cùng một vị trí.

Mô hình được lưu trong tất cả các thư mục là giống hệt nhau và thường chỉ mô hình được lưu bởi trưởng mới được tham chiếu để khôi phục hoặc phục vụ.

Bạn nên có một số logic dọn dẹp để xóa các thư mục tạm thời được tạo bởi công nhân sau khi quá trình đào tạo của bạn đã hoàn thành.

Lý do để tiết kiệm đồng thời cho trưởng và công nhân là vì bạn có thể đang tổng hợp các biến trong quá trình kiểm tra, điều này yêu cầu cả trưởng và công nhân tham gia vào giao thức giao tiếp allreduce. Mặt khác, việc để trưởng và công nhân lưu vào cùng một thư mục mô hình sẽ dẫn đến sai sót do tranh chấp.

Sử dụng MultiWorkerMirroredStrategy , chương trình được chạy trên mọi worker và để biết liệu worker hiện tại có phải là trưởng hay không, nó sẽ tận dụng đối tượng trình phân giải cụm có các thuộc tính task_typetask_id :

  • task_type cho bạn biết công việc hiện tại là gì (ví dụ: 'worker' ).
  • task_id cho bạn biết mã định danh của worker.
  • Công nhân có task_id == 0 được chỉ định là công nhân chính.

Trong đoạn mã bên dưới, hàm write_filepath cung cấp đường dẫn tệp để ghi, đường dẫn này phụ thuộc vào task_id của worker:

  • Đối với công nhân trưởng (với task_id == 0 ), nó ghi vào đường dẫn tệp gốc.
  • Đối với các worker khác, nó tạo một thư mục tạm temp_dir —với task_id trong đường dẫn thư mục để ghi vào:
model_path = '/tmp/keras-model'

def _is_chief(task_type, task_id):
  # Note: there are two possible `TF_CONFIG` configuration.
  #   1) In addition to `worker` tasks, a `chief` task type is use;
  #      in this case, this function should be modified to
  #      `return task_type == 'chief'`.
  #   2) Only `worker` task type is used; in this case, worker 0 is
  #      regarded as the chief. The implementation demonstrated here
  #      is for this case.
  # For the purpose of this Colab section, the `task_type is None` case
  # is added because it is effectively run with only a single worker.
  return (task_type == 'worker' and task_id == 0) or task_type is None

def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)
write_model_path = write_filepath(model_path, task_type, task_id)

Với điều đó, bây giờ bạn đã sẵn sàng để tiết kiệm:

multi_worker_model.save(write_model_path)
2022-02-05 02:21:31.809502: W tensorflow/python/util/util.cc:368] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them.
INFO:tensorflow:Assets written to: /tmp/keras-model/assets
INFO:tensorflow:Assets written to: /tmp/keras-model/assets

Như đã mô tả ở trên, về sau, mô hình chỉ nên được tải từ đường dẫn trưởng được lưu vào, vì vậy hãy xóa các mô hình tạm thời mà không phải công nhân trưởng đã lưu:

if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(os.path.dirname(write_model_path))

Bây giờ, khi đã đến lúc tải, hãy sử dụng API tf.keras.models.load_model tiện lợi và tiếp tục với công việc tiếp theo.

Ở đây, giả sử chỉ sử dụng một nhân viên duy nhất để tải và tiếp tục đào tạo, trong trường hợp đó, bạn không gọi tf.keras.models.load_model trong một strategy.scope() (lưu ý rằng strategy = tf.distribute.MultiWorkerMirroredStrategy() , như đã định nghĩa trước đó ):

loaded_model = tf.keras.models.load_model(model_path)

# Now that the model is restored, and can continue with the training.
loaded_model.fit(single_worker_dataset, epochs=2, steps_per_epoch=20)
Epoch 1/2
20/20 [==============================] - 1s 12ms/step - loss: 2.2949 - accuracy: 0.0492
Epoch 2/2
20/20 [==============================] - 0s 13ms/step - loss: 2.2680 - accuracy: 0.0773
<keras.callbacks.History at 0x7f6669989750>

Lưu và khôi phục điểm kiểm tra

Mặt khác, điểm kiểm tra cho phép bạn lưu trọng lượng của mô hình và khôi phục chúng mà không cần phải lưu toàn bộ mô hình.

Tại đây, bạn sẽ tạo một tf.train.Checkpoint theo dõi mô hình, được quản lý bởi tf.train.CheckpointManager , để chỉ có điểm kiểm tra mới nhất được giữ nguyên:

checkpoint_dir = '/tmp/ckpt'

checkpoint = tf.train.Checkpoint(model=multi_worker_model)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id)
checkpoint_manager = tf.train.CheckpointManager(
    checkpoint, directory=write_checkpoint_dir, max_to_keep=1)

Khi CheckpointManager được thiết lập, bây giờ bạn đã sẵn sàng để lưu và xóa các trạm kiểm soát mà những người không phải là nhân viên chính đã lưu:

checkpoint_manager.save()
if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(write_checkpoint_dir)

Bây giờ, khi bạn cần khôi phục mô hình, bạn có thể tìm thấy điểm kiểm tra mới nhất được lưu bằng cách sử dụng hàm tf.train.latest_checkpoint tiện lợi. Sau khi khôi phục điểm kiểm tra, bạn có thể tiếp tục đào tạo.

latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
checkpoint.restore(latest_checkpoint)
multi_worker_model.fit(multi_worker_dataset, epochs=2, steps_per_epoch=20)
2022-02-05 02:21:33.584421: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:5"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}
Epoch 1/2
2022-02-05 02:21:33.803317: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
20/20 [==============================] - 3s 13ms/step - loss: 2.2970 - accuracy: 0.0547
Epoch 2/2
20/20 [==============================] - 0s 13ms/step - loss: 2.2690 - accuracy: 0.0938
<keras.callbacks.History at 0x7f6669589850>

Gọi lại BackupAndRestore

Lệnh gọi lại tf.keras.callbacks.BackupAndRestore cung cấp chức năng chịu lỗi bằng cách sao lưu mô hình và số kỷ nguyên hiện tại trong tệp điểm kiểm tra tạm thời dưới đối số backup_dir cho BackupAndRestore . Điều này được thực hiện vào cuối mỗi kỷ nguyên.

Khi công việc bị gián đoạn và khởi động lại, lệnh gọi lại sẽ khôi phục điểm kiểm tra cuối cùng và quá trình đào tạo tiếp tục từ đầu kỷ nguyên bị gián đoạn. Bất kỳ quá trình đào tạo từng phần nào đã được thực hiện trong kỷ nguyên chưa hoàn thành trước khi bị gián đoạn sẽ bị loại bỏ, để nó không ảnh hưởng đến trạng thái mô hình cuối cùng.

Để sử dụng nó, hãy cung cấp một phiên bản của tf.keras.callbacks.BackupAndRestore tại lệnh gọi Model.fit .

Với MultiWorkerMirroredStrategy , nếu một worker bị gián đoạn, toàn bộ cụm sẽ tạm dừng cho đến khi khởi động lại worker bị gián đoạn. Các công nhân khác cũng sẽ khởi động lại và công nhân bị gián đoạn tham gia lại cụm. Sau đó, mọi công nhân đọc tệp điểm kiểm tra đã được lưu trước đó và chọn trạng thái cũ của nó, do đó cho phép cụm đồng bộ trở lại. Sau đó, quá trình đào tạo tiếp tục.

Lệnh gọi lại BackupAndRestore sử dụng CheckpointManager để lưu và khôi phục trạng thái đào tạo, tạo một tệp có tên là điểm kiểm tra theo dõi các điểm kiểm tra hiện có cùng với điểm kiểm tra mới nhất. Vì lý do này, backup_dir không nên được sử dụng lại để lưu trữ các trạm kiểm soát khác nhằm tránh xung đột tên.

Hiện tại, lệnh gọi lại BackupAndRestore hỗ trợ đào tạo một nhân viên mà không cần chiến lược— MirroredStrategy —và đào tạo nhiều nhân viên với MultiWorkerMirroredStrategy .

Dưới đây là hai ví dụ cho cả đào tạo nhiều người lao động và đào tạo một người lao động:

# Multi-worker training with `MultiWorkerMirroredStrategy`
# and the `BackupAndRestore` callback.

callbacks = [tf.keras.callbacks.BackupAndRestore(backup_dir='/tmp/backup')]
with strategy.scope():
  multi_worker_model = mnist_setup.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
                       epochs=3,
                       steps_per_epoch=70,
                       callbacks=callbacks)
2022-02-05 02:21:37.063622: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:5"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}
Epoch 1/3
70/70 [==============================] - 3s 13ms/step - loss: 2.2667 - accuracy: 0.2123
Epoch 2/3
70/70 [==============================] - 1s 13ms/step - loss: 2.1925 - accuracy: 0.4509
Epoch 3/3
70/70 [==============================] - 1s 13ms/step - loss: 2.1057 - accuracy: 0.5614
<keras.callbacks.History at 0x7f6669555d90>

Nếu bạn kiểm tra thư mục backup_dir mà bạn đã chỉ định trong BackupAndRestore , bạn có thể nhận thấy một số tệp điểm kiểm tra được tạo tạm thời. Các tệp đó cần thiết để khôi phục các phiên bản đã mất trước đó và chúng sẽ bị thư viện xóa ở cuối Model.fit khi thoát thành công khóa đào tạo của bạn.

Các nguồn bổ sung

  1. Hướng dẫn đào tạo Phân phối trong TensorFlow cung cấp tổng quan về các chiến lược phân phối có sẵn.
  2. Vòng đào tạo tùy chỉnh với Keras và MultiWorkerMirroredStrategy hướng dẫn cách sử dụng MultiWorkerMirroredStrategy với Keras và một vòng đào tạo tùy chỉnh.
  3. Kiểm tra các mô hình chính thức, nhiều mô hình trong số đó có thể được định cấu hình để chạy nhiều chiến lược phân phối.
  4. Hướng dẫn Hiệu suất tốt hơn với tf. Chức năng cung cấp thông tin về các chiến lược và công cụ khác, chẳng hạn như TensorFlow Profiler mà bạn có thể sử dụng để tối ưu hóa hiệu suất của các mô hình TensorFlow của mình.