Trang này được dịch bởi Cloud Translation API.
Switch to English

Đầu vào phân tán

Xem trên TensorFlow.org Chạy trong Google Colab Xem nguồn trên GitHub Tải xuống sổ tay

Các API tf.distribute cung cấp một cách dễ dàng để người dùng mở rộng quy mô đào tạo của họ từ một máy thành nhiều máy. Khi mở rộng mô hình của họ, người dùng cũng phải phân phối đầu vào của họ trên nhiều thiết bị. tf.distribute cung cấp các API mà bạn có thể tự động phân phối thông tin đầu vào trên các thiết bị.

Hướng dẫn này sẽ chỉ cho bạn các cách khác nhau mà bạn có thể tạo tập dữ liệu phân tán và trình vòng lặp bằng cách sử dụng API tf.distribute . Ngoài ra, các chủ đề sau sẽ được đề cập:

Hướng dẫn này không đề cập đến việc sử dụng đầu vào phân tán với API Keras.

Tập dữ liệu được phân phối

Để sử dụng các API tf.distribute để mở rộng quy mô, người dùng nên sử dụngtf.data.Dataset để đại diện cho đầu vào của họ. tf.distribute đã được thực hiện để hoạt động hiệu quả vớitf.data.Dataset (ví dụ: tự động tìm nạp trước dữ liệu trên mỗi thiết bị tăng tốc) với việc tối ưu hóa hiệu suất thường xuyên được kết hợp vào việc triển khai. Nếu bạn có trường hợp sử dụng để sử dụng thứ gì đó khác vớitf.data.Dataset , vui lòng tham khảo phần sau của hướng dẫn này. Trong vòng lặp đào tạo không phân tán, trước tiên người dùng tạo một cá thểtf.data.Dataset và sau đó lặp lại các phần tử. Ví dụ:

import tensorflow as tf

# Helper libraries
import numpy as np
import os

print(tf.__version__)
2.3.0

global_batch_size = 16
# Create a tf.data.Dataset object.
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)

@tf.function
def train_step(inputs):
  features, labels = inputs
  return labels - 0.3 * features

# Iterate over the dataset using the for..in construct.
for inputs in dataset:
  print(train_step(inputs))
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)

Để cho phép người dùng sử dụng chiến lược tf.distribute với những thay đổi tối thiểu đối với mã hiện có của người dùng, hai API đã được giới thiệu sẽ phân phối mộttf.data.Dataset và trả về một đối tượng tập dữ liệu phân tán. Sau đó, người dùng có thể lặp lại phiên bản tập dữ liệu phân tán này và đào tạo mô hình của họ như trước. Bây giờ chúng ta hãy xem xét hai API - tf.distribute.Strategy.experimental_distribute_datasettf.distribute.Strategy.experimental_distribute_datasets_from_function năng chi tiết hơn:

tf.distribute.Strategy.experimental_distribute_dataset

Sử dụng

API này lấy một cá thểtf.data.Dataset làm đầu vào và trả về một cá thể tf.distribute.DistributedDataset . Bạn nên nhập hàng loạt tập dữ liệu đầu vào với giá trị bằng kích thước lô chung. Kích thước lô toàn cầu này là số lượng mẫu mà bạn muốn xử lý trên tất cả các thiết bị trong 1 bước. Bạn có thể lặp lại trên bộ dữ liệu phân phối này trong một thời trang Pythonic hoặc tạo một iterator sử dụng iter . Đối tượng trả về không phải là mộttf.data.Dataset và không hỗ trợ bất kỳ API nào khác có thể chuyển đổi hoặc kiểm tra tập dữ liệu theo bất kỳ cách nào. Đây là API được đề xuất nếu bạn không có các cách cụ thể mà bạn muốn phân đoạn đầu vào của mình thành các bản sao khác nhau.

global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
# Distribute input using the `experimental_distribute_dataset`.
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
# 1 global batch of data fed to the model in 1 step.
print(next(iter(dist_dataset)))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/data/ops/multi_device_iterator_ops.py:601: get_next_as_optional (from tensorflow.python.data.ops.iterator_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.data.Iterator.get_next_as_optional()` instead.
(<tf.Tensor: shape=(16, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>, <tf.Tensor: shape=(16, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>)

Tính chất

Hàng loạt

tf.distribute bắt lại cá thểtf.data.Dataset đầu vào với kích thước lô mới bằng với kích thước lô toàn cầu chia cho số bản sao đồng bộ. Số lượng bản sao được đồng bộ hóa bằng số lượng thiết bị đang tham gia phân bổ gradient trong quá trình đào tạo. Khi người dùng gọi next trên trình vòng lặp phân tán, kích thước hàng loạt của mỗi bản sao dữ liệu được trả về trên mỗi bản sao. Số lượng bản sao của tập dữ liệu được phục hồi sẽ luôn là bội số của số bản sao. Dưới đây là một số ví dụ:

  • tf.data.Dataset.range(6).batch(4, drop_remainder=False)

    • Không có phân phối:
    • Đợt 1: [0, 1, 2, 3]
    • Đợt 2: [4, 5]
    • Với phân phối trên 2 bản sao. Lô cuối cùng ([4, 5]) được chia thành 2 bản sao.

    • Đợt 1:

      • Bản sao 1: [0, 1]
      • Bản sao 2: [2, 3]
    • Đợt 2:

      • Bản sao 2: [4]
      • Bản sao 2: [5]
  • tf.data.Dataset.range(4).batch(4)

    • Không có phân phối:
    • Đợt 1: [[0], [1], [2], [3]]
    • Với phân phối trên 5 bản sao:
    • Đợt 1:
      • Bản sao 1: [0]
      • Bản sao 2: [1]
      • Bản sao 3: [2]
      • Bản sao 4: [3]
      • Bản sao 5: []
  • tf.data.Dataset.range(8).batch(4)

    • Không có phân phối:
    • Đợt 1: [0, 1, 2, 3]
    • Đợt 2: [4, 5, 6, 7]
    • Với phân phối trên 3 bản sao:
    • Đợt 1:
      • Bản sao 1: [0, 1]
      • Bản sao 2: [2, 3]
      • Bản sao 3: []
    • Đợt 2:
      • Bản sao 1: [4, 5]
      • Bản sao 2: [6, 7]
      • Bản sao 3: []

Việc so khớp tập dữ liệu có độ phức tạp về không gian tăng tuyến tính với số lượng bản sao. Điều này có nghĩa là đối với trường hợp sử dụng đào tạo nhiều nhân viên, đường ống đầu vào có thể gặp lỗi OOM.

Mài

tf.distribute cũng tự động cứng lại tập dữ liệu đầu vào trong đào tạo nhiều nhân viên. Mỗi tập dữ liệu được tạo trên thiết bị CPU của nhân viên. Tự động tích hợp tập dữ liệu qua một tập hợp công nhân có nghĩa là mỗi công nhân được gán một tập hợp con của toàn bộ tập dữ liệu (nếu đặt đúng tf.data.experimental.AutoShardPolicy ). Điều này là để đảm bảo rằng ở mỗi bước, kích thước lô toàn cầu của các phần tử tập dữ liệu không chồng chéo sẽ được xử lý bởi từng công nhân. Tự động sạc có một vài tùy chọn khác nhau có thể được chỉ định bằng cách sử dụng tf.data.experimental.DistributeOptions .

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
dataset = dataset.with_options(options)

Có ba tùy chọn khác nhau mà bạn có thể đặt cho tf.data.experimental.AutoShardPolicy :

  • AUTO: Đây là tùy chọn mặc định có nghĩa là một nỗ lực sẽ được thực hiện để phân đoạn bằng FILE. Nỗ lực phân đoạn bằng FILE không thành công nếu tập dữ liệu dựa trên tệp không được phát hiện. tf.distribute sau đó sẽ trở lại sharding bởi DATA. Lưu ý rằng nếu tập dữ liệu đầu vào là dựa trên tệp nhưng số tệp ít hơn số công nhân, một InvalidArgumentError sẽ xuất hiện. Nếu điều này xảy ra, hãy đặt chính sách thành AutoShardPolicy.DATA một cách rõ ràng hoặc chia nguồn đầu vào của bạn thành các tệp nhỏ hơn sao cho số tệp lớn hơn số công nhân.
  • FILE: Đây là tùy chọn nếu bạn muốn chia nhỏ các tệp đầu vào trên tất cả các worker. Bạn nên sử dụng tùy chọn này nếu số lượng tệp đầu vào lớn hơn nhiều so với số lượng nhân công và dữ liệu trong tệp được phân bổ đồng đều. Nhược điểm của tùy chọn này là có nhân viên nhàn rỗi nếu dữ liệu trong các tệp không được phân phối đồng đều. Nếu số lượng tệp ít hơn số lượng công nhân, một InvalidArgumentError sẽ xuất hiện. Nếu điều này xảy ra, hãy đặt chính sách một cách rõ ràng thành AutoShardPolicy.DATA . Ví dụ: chúng ta hãy phân phối 2 tệp cho 2 công nhân với 1 bản sao mỗi tệp. Tệp 1 chứa [0, 1, 2, 3, 4, 5] và Tệp 2 chứa [6, 7, 8, 9, 10, 11]. Đặt tổng số bản sao được đồng bộ hóa là 2 và kích thước lô toàn cầu là 4.

    • Công nhân 0:
    • Đợt 1 = Bản sao 1: [0, 1]
    • Batch 2 = Replica 1: [2, 3]
    • Batch 3 = Replica 1: [4]
    • Đợt 4 = Bản sao 1: [5]
    • Công nhân 1:
    • Đợt 1 = Bản sao 2: [6, 7]
    • Batch 2 = Replica 2: [8, 9]
    • Batch 3 = Replica 2: [10]
    • Batch 4 = Replica 2: [11]
  • DỮ LIỆU: Điều này sẽ tự động đóng cứng các phần tử trên tất cả các công nhân. Mỗi công nhân sẽ đọc toàn bộ tập dữ liệu và chỉ xử lý phân đoạn được gán cho nó. Tất cả các phân đoạn khác sẽ bị loại bỏ. Điều này thường được sử dụng nếu số lượng tệp đầu vào ít hơn số lượng nhân viên và bạn muốn phân tích dữ liệu tốt hơn trên tất cả các nhân viên. Nhược điểm là toàn bộ tập dữ liệu sẽ được đọc trên mỗi worker. Ví dụ: hãy để chúng tôi phân phối 1 tệp trên 2 công nhân. Tệp 1 chứa [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]. Hãy để tổng số bản sao đồng bộ là 2.

    • Công nhân 0:
    • Đợt 1 = Bản sao 1: [0, 1]
    • Lô 2 = Bản sao 1: [4, 5]
    • Batch 3 = Replica 1: [8, 9]
    • Công nhân 1:
    • Đợt 1 = Bản sao 2: [2, 3]
    • Batch 2 = Replica 2: [6, 7]
    • Batch 3 = Replica 2: [10, 11]
  • TẮT: Nếu bạn tắt tính năng tự động sạc, mỗi nhân viên sẽ xử lý tất cả dữ liệu. Ví dụ: chúng ta hãy phân phối 1 tệp cho 2 công nhân. Tệp 1 chứa [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]. Hãy để tổng số bản sao được đồng bộ hóa là 2. Sau đó, mỗi công nhân sẽ thấy phân phối sau:

    • Công nhân 0:
    • Đợt 1 = Bản sao 1: [0, 1]
    • Batch 2 = Replica 1: [2, 3]
    • Batch 3 = Replica 1: [4, 5]
    • Đợt 4 = Bản sao 1: [6, 7]
    • Đợt 5 = Bản sao 1: [8, 9]
    • Đợt 6 = Bản sao 1: [10, 11]

    • Công nhân 1:

    • Đợt 1 = Bản sao 2: [0, 1]

    • Batch 2 = Replica 2: [2, 3]

    • Batch 3 = Replica 2: [4, 5]

    • Batch 4 = Replica 2: [6, 7]

    • Lô 5 = Bản sao 2: [8, 9]

    • Lô 6 = Bản sao 2: [10, 11]

Tìm nạp trước

Theo mặc định, tf.distribute thêm một chuyển đổi tìm nạp trước ở cuối phiên bảntf.data.Dataset do người dùng cung cấp. Đối số cho phép chuyển đổi tìm nạp trước là buffer_size bằng với số lượng bản sao được đồng bộ hóa.

tf.distribute.Strategy.experimental_distribute_datasets_from_function

Sử dụng

API này nhận một hàm đầu vào và trả về một cá thể tf.distribute.DistributedDataset . Hàm đầu vào mà người dùng truyền vào có đối số tf.distribute.InputContext và phải trả về một cá thểtf.data.Dataset . Với API này, tf.distribute không thực hiện thêm bất kỳ thay đổi nào đối với cá thểtf.data.Dataset của người dùng được trả về từ hàm đầu vào. Người dùng có trách nhiệm phân lô và chia nhỏ tập dữ liệu. tf.distribute gọi hàm đầu vào trên thiết bị CPU của từng công nhân. Ngoài việc cho phép người dùng chỉ định lô-gic và sharding của riêng họ, API này cũng thể hiện khả năng mở rộng và hiệu suất tốt hơn so với tf.distribute.Strategy.experimental_distribute_dataset khi được sử dụng để đào tạo nhiều nhân viên.

mirrored_strategy = tf.distribute.MirroredStrategy()

def dataset_fn(input_context):
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)
  dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
  dataset = dataset.shard(
    input_context.num_input_pipelines, input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2) # This prefetches 2 batches per device.
  return dataset

dist_dataset = mirrored_strategy.experimental_distribute_datasets_from_function(dataset_fn)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

Tính chất

Hàng loạt

tf.data.Dataset thểtf.data.Dataset là giá trị trả về của hàm đầu vào nên được phân lô bằng cách sử dụng kích thước lô của mỗi bản sao. Kích thước lô mỗi bản sao là kích thước lô chung chia cho số bản sao đang tham gia đào tạo đồng bộ. Điều này là do tf.distribute gọi hàm đầu vào trên thiết bị CPU của mỗi công nhân. Tập dữ liệu được tạo trên một worker nhất định phải sẵn sàng để sử dụng bởi tất cả các bản sao trên worker đó.

Mài

Đối tượng tf.distribute.InputContext được truyền ngầm làm đối số cho hàm đầu vào của người dùng được tạo bởi tf.distribute bên dưới. Nó có thông tin về số lượng công nhân, id công nhân hiện tại, v.v. Hàm đầu vào này có thể xử lý sharding theo chính sách do người dùng đặt bằng cách sử dụng các thuộc tính này là một phần của đối tượng tf.distribute.InputContext .

Tìm nạp trước

tf.distribute không thêm phép chuyển đổi tìm nạp trước vào cuốitf.data.Dataset do hàm nhập do người dùng cung cấp trả về.

Trình lặp phân tán

Tương tự như các trường hợptf.data.Dataset không được phân phối, bạn sẽ cần tạo một trình lặp trên các cá thể tf.distribute.DistributedDataset để lặp lại nó và truy cập các phần tử trong tf.distribute.DistributedDataset . Sau đây là những cách mà bạn có thể tạo tf.distribute.DistributedIterator và sử dụng nó để đào tạo mô hình của bạn:

Tập quán

Sử dụng cấu trúc vòng lặp cho Pythonic

Bạn có thể sử dụng vòng lặp Pythonic thân thiện với người dùng để lặp qua tf.distribute.DistributedDataset . Các phần tử được trả về từ tf.distribute.DistributedIterator có thể là một tf.Tensor hoặc tf.distribute.DistributedValues chứa một giá trị trên mỗi bản sao. Đặt vòng lặp bên trong tf.function sẽ giúp tăng hiệu suất. Tuy nhiên, breakreturn hiện không được hỗ trợ cho một vòng lặp qua tf.distribute.DistributedDataset được đặt bên trong chức năng tf.function . Chúng tôi cũng không hỗ trợ đặt vòng lặp bên trong tf.function khi sử dụng các chiến lược nhiều nhân viên như tf.distribute.experimental.MultiWorkerMirroredStrategytf.distribute.TPUStrategy . Việc đặt vòng lặp bên trong tf.function hoạt động đối với một worker tf.distribute.TPUStrategy nhưng không hoạt động khi sử dụng TPU pod.

global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

@tf.function
def train_step(inputs):
  features, labels = inputs
  return labels - 0.3 * features

for x in dist_dataset:
  # train_step trains the model using the dataset elements
  loss = mirrored_strategy.run(train_step, args=(x,))
  print("Loss is ", loss)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)

Sử dụng iter để tạo một trình lặp rõ ràng

Để lặp lại các phần tử trong cá thể tf.distribute.DistributedDataset , bạn có thể tạo một tf.distribute.DistributedIterator bằng cách sử dụng API iter trên đó. Với một trình lặp rõ ràng, bạn có thể lặp lại cho một số bước cố định. Để có được các yếu tố tiếp theo từ một tf.distribute.DistributedIterator dụ dist_iterator , bạn có thể gọi next(dist_iterator) , dist_iterator.get_next() , hoặc dist_iterator.get_next_as_optional() . Hai cái trước về cơ bản giống nhau:

num_epochs = 10
steps_per_epoch = 5
for epoch in range(num_epochs):
  dist_iterator = iter(dist_dataset)
  for step in range(steps_per_epoch):
    # train_step trains the model using the dataset elements
    loss = mirrored_strategy.run(train_step, args=(next(dist_iterator),))
    # which is the same as
    # loss = mirrored_strategy.run(train_step, args=(dist_iterator.get_next(),))
    print("Loss is ", loss)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)

Với next() hoặc tf.distribute.DistributedIterator.get_next() , nếu tf.distribute.DistributedIterator đã kết thúc, lỗi OutOfRange sẽ được đưa ra. Máy khách có thể bắt lỗi ở phía python và tiếp tục thực hiện các công việc khác như kiểm tra và đánh giá. Tuy nhiên, điều này sẽ không hoạt động nếu bạn đang sử dụng vòng lặp đào tạo máy chủ (tức là chạy nhiều bước cho mỗi tf.function ), giống như sau:

@tf.function
def train_fn(iterator):
  for _ in tf.range(steps_per_loop):
    strategy.run(step_fn, args=(next(iterator),))

train_fn chứa nhiều bước bằng cách bao bọc phần thân bước bên trong tf.range . Trong trường hợp này, các lần lặp khác nhau trong vòng lặp không có phụ thuộc có thể bắt đầu song song, do đó, lỗi OutOfRange có thể được kích hoạt trong các lần lặp sau trước khi quá trình tính toán các lần lặp trước kết thúc. Một khi lỗi OutOfRange được tạo ra, tất cả các hoạt động trong hàm sẽ bị chấm dứt ngay lập tức. Nếu đây là một số trường hợp mà bạn muốn tránh, một giải pháp thay thế không gây ra lỗi tf.distribute.DistributedIterator.get_next_as_optional()tf.distribute.DistributedIterator.get_next_as_optional() . get_next_as_optional trả về một tf.experimental.Optional chứa phần tử tiếp theo hoặc không có giá trị nào nếu tf.distribute.DistributedIterator đã kết thúc.

# You can break the loop with get_next_as_optional by checking if the Optional contains value
global_batch_size = 4
steps_per_loop = 5
strategy = tf.distribute.MirroredStrategy(devices=["GPU:0", "CPU:0"])

dataset = tf.data.Dataset.range(9).batch(global_batch_size)
distributed_iterator = iter(strategy.experimental_distribute_dataset(dataset))

@tf.function
def train_fn(distributed_iterator):
  for _ in tf.range(steps_per_loop):
    optional_data = distributed_iterator.get_next_as_optional()
    if not optional_data.has_value():
      break
    per_replica_results = strategy.run(lambda x:x, args=(optional_data.get_value(),))
    tf.print(strategy.experimental_local_results(per_replica_results))
train_fn(distributed_iterator)
WARNING:tensorflow:There are non-GPU devices in `tf.distribute.Strategy`, not using nccl allreduce.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:CPU:0')
([0 1], [2 3])
([4 5], [6 7])
([8], [])

Sử dụng thuộc tính element_spec

Nếu bạn chuyển các phần tử của một tập dữ liệu phân tán đến một tf.function và muốn đảm bảo tf.TypeSpec , bạn có thể chỉ định đối số input_signature của tf.function . tf.function . Đầu ra của tập dữ liệu phân tán là tf.distribute.DistributedValues có thể đại diện cho đầu vào cho một thiết bị hoặc nhiều thiết bị. Để nhận tf.TypeSpec tương ứng với giá trị phân tán này, bạn có thể sử dụng thuộc tính element_spec của tập dữ liệu phân tán hoặc đối tượng trình vòng lặp phân tán.

global_batch_size = 16
epochs = 5
steps_per_epoch = 5
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

@tf.function(input_signature=[dist_dataset.element_spec])
def train_step(per_replica_inputs):
  def step_fn(inputs):
    return 2 * inputs

  return mirrored_strategy.run(step_fn, args=(per_replica_inputs,))

for _ in range(epochs):
  iterator = iter(dist_dataset)
  for _ in range(steps_per_epoch):
    output = train_step(next(iterator))
    tf.print(output)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])

Hàng loạt một phần

Gặp phải các lô một phần khi các cá thểtf.data.Dataset mà người dùng tạo có thể chứa các kích thước lô không chia hết cho số lượng bản sao hoặc khi số lượng bản sao của cá thể tập dữ liệu không chia hết cho kích thước lô. Điều này có nghĩa là khi tập dữ liệu được phân phối trên nhiều bản sao, next gọi next trên một số trình vòng lặp sẽ dẫn đến lỗi OutOfRangeError. Để xử lý trường hợp sử dụng này, tf.distribute trả về lô giả có kích thước lô 0 trên các bản sao không có thêm bất kỳ dữ liệu nào để xử lý.

Đối với trường hợp worker đơn lẻ, nếu dữ liệu không được trả về bởi cuộc gọi next trên trình vòng lặp, các lô giả có kích thước lô 0 được tạo và sử dụng cùng với dữ liệu thực trong tập dữ liệu. Trong trường hợp các lô một phần, lô dữ liệu toàn cục cuối cùng sẽ chứa dữ liệu thực cùng với các lô dữ liệu giả. Điều kiện dừng để xử lý dữ liệu hiện kiểm tra xem có bất kỳ bản sao nào có dữ liệu hay không. Nếu không có dữ liệu trên bất kỳ bản sao nào, thì lỗi OutOfRange sẽ xảy ra.

Đối với trường hợp multi worker, giá trị boolean đại diện cho sự hiện diện của dữ liệu trên từng worker được tổng hợp bằng cách sử dụng giao tiếp bản sao chéo và giá trị này được sử dụng để xác định xem tất cả các worker đã xử lý xong tập dữ liệu phân tán hay chưa. Vì điều này liên quan đến giao tiếp giữa các nhân viên nên có một số hình phạt về hiệu suất liên quan.

Cảnh báo

  • Khi sử dụng API tf.distribute.Strategy.experimental_distribute_dataset với thiết lập nhiều worker, người dùng chuyển mộttf.data.Dataset đọc từ các tệp. Nếu tf.data.experimental.AutoShardPolicy được đặt thành AUTO hoặc FILE , kích thước lô thực tế trên mỗi bước có thể nhỏ hơn kích thước lô toàn cầu do người dùng xác định. Điều này có thể xảy ra khi các phần tử còn lại trong tệp nhỏ hơn kích thước lô chung. Người dùng có thể sử dụng hết tập dữ liệu mà không phụ thuộc vào số bước chạy hoặc đặt tf.data.experimental.AutoShardPolicy thành DATA để giải quyết vấn đề đó.

  • Chuyển đổi tập dữ liệu trạng thái hiện không được hỗ trợ với tf.distribute và bất kỳ hoạt động trạng thái nào mà tập dữ liệu có thể có hiện đang bị bỏ qua. Ví dụ: nếu tập dữ liệu của bạn có map_fn sử dụng tf.random.uniform để xoay hình ảnh, thì bạn có biểu đồ tập dữ liệu phụ thuộc vào trạng thái (tức là hạt ngẫu nhiên) trên máy cục bộ nơi quá trình python đang được thực hiện.

  • Thử nghiệm tf.data.experimental.OptimizationOptions Các tf.data.experimental.OptimizationOptions bị tắt theo mặc định có thể trong một số ngữ cảnh nhất định - chẳng hạn như khi được sử dụng cùng với tf.distribute - gây ra sự suy giảm hiệu suất. Bạn chỉ nên bật chúng sau khi xác thực rằng chúng có lợi cho hiệu suất khối lượng công việc của bạn trong cài đặt phân phối.

  • Thứ tự dữ liệu được xử lý bởi công nhân khi sử dụng tf.distribute.experimental_distribute_dataset hoặc tf.distribute.experimental_distribute_datasets_from_function không được đảm bảo. Điều này thường được yêu cầu nếu bạn đang sử dụng tf.distribute để mở rộng dự đoán. Tuy nhiên, bạn có thể chèn chỉ mục cho mỗi phần tử trong kết quả đầu ra theo thứ tự và hàng loạt tương ứng. Đoạn mã sau là một ví dụ về cách sắp xếp đầu ra.

mirrored_strategy = tf.distribute.MirroredStrategy()
dataset_size = 24
batch_size = 6
dataset = tf.data.Dataset.range(dataset_size).enumerate().batch(batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

def predict(index, inputs):
  outputs = 2 * inputs
  return index, outputs

result = {}
for index, inputs in dist_dataset:
  output_index, outputs = mirrored_strategy.run(predict, args=(index, inputs))
  indices = list(mirrored_strategy.experimental_local_results(output_index))
  rindices = []
  for a in indices:
    rindices.extend(a.numpy())
  outputs = list(mirrored_strategy.experimental_local_results(outputs))
  routputs = []
  for a in outputs:
    routputs.extend(a.numpy())
  for i, value in zip(rindices, routputs):
    result[i] = value

print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `experimental_run_v2` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `experimental_run_v2` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `experimental_run_v2` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `experimental_run_v2` inside a tf.function to get the best performance.
{0: 0, 1: 2, 2: 4, 3: 6, 4: 8, 5: 10, 6: 12, 7: 14, 8: 16, 9: 18, 10: 20, 11: 22, 12: 24, 13: 26, 14: 28, 15: 30, 16: 32, 17: 34, 18: 36, 19: 38, 20: 40, 21: 42, 22: 44, 23: 46}

Làm cách nào để phân phối dữ liệu của tôi nếu tôi không sử dụng phiên bản tf.data.Dataset chuẩn?

Đôi khi người dùng không thể sử dụngtf.data.Dataset để đại diện cho đầu vào của họ và sau đó là các API được đề cập ở trên để phân phối tập dữ liệu cho nhiều thiết bị. Trong những trường hợp như vậy, bạn có thể sử dụng bộ căng thô hoặc đầu vào từ máy phát điện.

Sử dụng thử nghiệm_distribute_values_from_ Chức năng cho các đầu vào tensor tùy ý

strategy.run tf.distribute.DistributedValues chấp nhận tf.distribute.DistributedValues là đầu ra của next(iterator) . Để chuyển các giá trị tensor, hãy sử dụng experimental_distribute_values_from_function tf.distribute.DistributedValues để tạo tf.distribute.DistributedValues từ các tensor thô.

mirrored_strategy = tf.distribute.MirroredStrategy()
worker_devices = mirrored_strategy.extended.worker_devices

def value_fn(ctx):
  return tf.constant(1.0)

distributed_values = mirrored_strategy.experimental_distribute_values_from_function(value_fn)
for _ in range(4):
  result = mirrored_strategy.run(lambda x:x, args=(distributed_values,))
  print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `experimental_run_v2` inside a tf.function to get the best performance.
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)

Sử dụng tf.data.Dataset.from_generator nếu đầu vào của bạn là từ trình tạo

Nếu bạn có một hàm trình tạo mà bạn muốn sử dụng, bạn có thể tạo mộttf.data.Dataset bằng cách sử dụng API from_generator .

mirrored_strategy = tf.distribute.MirroredStrategy()
def input_gen():
  while True:
    yield np.random.rand(4)

# use Dataset.from_generator
dataset = tf.data.Dataset.from_generator(
    input_gen, output_types=(tf.float32), output_shapes=tf.TensorShape([4]))
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
iterator = iter(dist_dataset)
for _ in range(4):
  mirrored_strategy.run(lambda x:x, args=(next(iterator),))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)