امروز برای رویداد محلی TensorFlow خود در همه جا پاسخ دهید!
این صفحه به‌وسیله ‏Cloud Translation API‏ ترجمه شده است.
Switch to English

ورودی توزیع شده

مشاهده در TensorFlow.org در Google Colab اجرا کنید مشاهده منبع در GitHub دانلود دفترچه یادداشت

tf.distribute API راهی آسان برای کاربران برای مقیاس گذاری آموزش خود از یک ماشین به چندین ماشین فراهم می کند. هنگام مقیاس گذاری مدل خود ، کاربران همچنین باید ورودی خود را در چندین دستگاه توزیع کنند. tf.distribute با استفاده از آنها می توانید ورودی خود را در دستگاه ها توزیع کنید.

این راهنما به شما روش های مختلفی را می دهد که می توانید با استفاده از tf.distribute API مجموعه داده و تکرار کننده توزیع شده را ایجاد کنید. علاوه بر این ، عناوین زیر پوشش داده خواهد شد:

این راهنما کاربرد ورودی توزیع شده با API های Keras را پوشش نمی دهد.

مجموعه داده های توزیع شده

برای استفاده از tf.distribute API برای مقیاس بندی ، توصیه می شود که کاربران ازtf.data.Dataset برای نشان دادن ورودی خود استفاده کنند. tf.distribute ساخته شده است تا باtf.data.Dataset (به عنوان مثال ، پیش نصب خودکار داده ها روی هر دستگاه شتاب دهنده) به طور کارآمد کار کند و بهینه سازی عملکرد به طور منظم در اجرا گنجانده شود. اگر مورد استفاده برای استفاده از مورد دیگری غیر ازtf.data.Dataset دارید ، لطفاً به بخش بعدی در این راهنما مراجعه کنید. در یک حلقه آموزش غیر توزیع شده ، کاربران ابتدا یک نمونهtf.data.Dataset ایجاد می کنند و سپس بر روی عناصر تکرار می شوند. مثلا:

import tensorflow as tf

# Helper libraries
import numpy as np
import os

print(tf.__version__)
2.4.0

global_batch_size = 16
# Create a tf.data.Dataset object.
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)

@tf.function
def train_step(inputs):
  features, labels = inputs
  return labels - 0.3 * features

# Iterate over the dataset using the for..in construct.
for inputs in dataset:
  print(train_step(inputs))
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)

برای اجازه دادن به کاربران برای استفاده از استراتژی tf.distribute با کمترین تغییر در کد موجود کاربر ، دو API معرفی شد که یک نمونهtf.data.Dataset را توزیع میtf.data.Dataset و یک شی مجموعه داده توزیع شده را برمی گردانند. سپس یک کاربر می تواند این نمونه مجموعه داده توزیع شده را تکرار کرده و مدل خود را مانند قبل آموزش دهد. بیایید اکنون به دو API - tf.distribute.Strategy.experimental_distribute_dataset و tf.distribute.Strategy.distribute_datasets_from_function با جزئیات بیشتر tf.distribute.Strategy.distribute_datasets_from_function :

tf.distribute.Strategy.experimental_distribute_dataset

استفاده

این API یک نمونهtf.data.Dataset به عنوان ورودی گرفته و یک نمونه tf.distribute.DistributedDataset برمی گرداند. شما باید مجموعه داده ورودی را با مقداری برابر با اندازه دسته جهانی کنید. این اندازه دسته جهانی تعداد نمونه هایی است که می خواهید در 1 مرحله در همه دستگاه ها پردازش کنید. می توانید روی این مجموعه داده توزیع شده به صورت Pythonic تکرار کنید یا یک تکرار کننده با استفاده از iter ایجاد کنید. شیtf.data.Dataset نمونه ای ازtf.data.Dataset نیست و هیچ API دیگری را که به هیچ وجه مجموعه داده را تغییر داده یا بازرسی می کند پشتیبانی نمی کند. اگر روش خاصی ندارید که بخواهید ورودی خود را بر روی کپی های مختلف خرد کنید ، این API توصیه شده است.

global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
# Distribute input using the `experimental_distribute_dataset`.
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
# 1 global batch of data fed to the model in 1 step.
print(next(iter(dist_dataset)))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
(<tf.Tensor: shape=(16, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>, <tf.Tensor: shape=(16, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>)

خواص

دسته بندی

tf.distribute ورودیtf.data.Dataset با اندازه دسته ای جدید که برابر با اندازه دسته جهانی است تقسیم بر تعداد نسخه های همگام سازی ،tf.data.Dataset مورد استفاده قرار می دهد. تعداد ماکت های همگام سازی برابر است با تعداد دستگاه هایی که در طول آموزش در کاهش گرادیان شرکت می کنند. هنگامی که یک کاربر با تکرار توزیع شده تماس next را می گیرد ، هر دسته از داده ها به ازای هر ماکت ، اندازه ای از داده ها برمی گردد. کاردیتالیته مجموعه داده های دوباره بکار رفته همیشه چندین برابر از تعداد نسخه ها خواهد بود. در اینجا چند مثال آورده شده است:

  • tf.data.Dataset.range(6).batch(4, drop_remainder=False)

    • بدون توزیع:
    • دسته 1: [0 ، 1 ، 2 ، 3]
    • دسته 2: [4 ، 5]
    • با توزیع بیش از 2 نسخه. آخرین دسته ([4 ، 5]) بین 2 نسخه تقسیم می شود.

    • دسته 1:

      • ماکت 1: [0 ، 1]
      • ماکت 2: [2 ، 3]
    • دسته 2:

      • ماکت 2: [4]
      • ماکت 2: [5]
  • tf.data.Dataset.range(4).batch(4)

    • بدون توزیع:
    • دسته 1: [[0] ، [1] ، [2] ، [3]]
    • با توزیع بیش از 5 نسخه:
    • دسته 1:
      • ماکت 1: [0]
      • ماکت 2: [1]
      • ماکت 3: [2]
      • ماکت 4: [3]
      • ماکت 5: []
  • tf.data.Dataset.range(8).batch(4)

    • بدون توزیع:
    • دسته 1: [0 ، 1 ، 2 ، 3]
    • دسته 2: [4 ، 5 ، 6 ، 7]
    • با توزیع بیش از 3 نسخه:
    • دسته 1:
      • ماکت 1: [0 ، 1]
      • ماکت 2: [2 ، 3]
      • ماکت 3: []
    • دسته 2:
      • ماکت 1: [4 ، 5]
      • ماکت 2: [6 ، 7]
      • ماکت 3: []

دوباره بکارگیری مجموعه داده دارای پیچیدگی فضایی است که به طور خطی با تعداد نسخه ها افزایش می یابد. این بدان معنی است که برای موارد استفاده از آموزش چند کارگر ، خط لوله ورودی می تواند با خطاهای OOM روبرو شود.

خرد کردن

tf.distribute همچنین مجموعه داده های ورودی را در آموزش چند کارگر با MultiWorkerMirroredStrategy و TPUStrategy . هر مجموعه داده در دستگاه CPU کارگر ایجاد می شود. سخت افزاری خودکار مجموعه داده بر روی مجموعه ای از کارگران به این معنی است که به هر کارگر زیر مجموعه ای از کل مجموعه داده اختصاص داده شده است (اگر tf.data.experimental.AutoShardPolicy درست تنظیم شده باشد). این امر برای اطمینان از این است که در هر مرحله ، اندازه دسته ای جهانی عناصر مجموعه داده غیر همپوشانی توسط هر کارگر پردازش می شود. اتوشاردینگ چند گزینه مختلف دارد که می توان با استفاده از tf.data.experimental.DistributeOptions مشخص tf.data.experimental.DistributeOptions . توجه داشته باشید که در آموزش چند کاره با ParameterServerStrategy هیچ سختکاری خودکار وجود ندارد و اطلاعات بیشتر در مورد ایجاد مجموعه داده با این استراتژی را می توانید در آموزش Parameter Server Strategy بیابید .

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
dataset = dataset.with_options(options)

سه گزینه مختلف وجود دارد که می توانید برای tf.data.experimental.AutoShardPolicy :

  • AUTO: این گزینه پیش فرض است که به معنای تلاش برای خرد کردن توسط FILE است. اگر یک مجموعه داده مبتنی بر فایل شناسایی نشود ، تلاش برای خرد کردن توسط FILE انجام نمی شود. tf.distribute سپس به Darda تبدیل خواهد شد. توجه داشته باشید که اگر مجموعه داده ورودی مبتنی بر فایل باشد اما تعداد پرونده ها از تعداد کارگران کمتر باشد ، یک InvalidArgumentError مطرح می شود. اگر این اتفاق افتاد ، به صراحت خط مشی AutoShardPolicy.DATA یا منبع ورودی خود را به پرونده های کوچکتر تقسیم کنید ، به طوری که تعداد پرونده ها از تعداد کارگران بیشتر باشد
  • FILE: اگر می خواهید پرونده های ورودی را روی همه کارگران خرد کنید ، این گزینه است. اگر تعداد پرونده های ورودی بسیار بیشتر از تعداد کارگران باشد و داده های موجود در پرونده ها به طور مساوی توزیع شود ، باید از این گزینه استفاده کنید. اگر داده های موجود در پرونده ها به طور یکنواخت توزیع نشوند ، نقطه ضعف این گزینه داشتن کارگران بیکار است. اگر تعداد پرونده ها از تعداد کارگران کمتر باشد ، یک InvalidArgumentError مطرح می شود. اگر این اتفاق افتاد ، صریحاً خط مشی را روی AutoShardPolicy.DATA . به عنوان مثال ، اجازه دهید 2 پرونده را بر روی 2 کارگر با هر تکرار توزیع کنیم. پرونده 1 شامل [0 ، 1 ، 2 ، 3 ، 4 ، 5] و پرونده 2 شامل [6 ، 7 ، 8 ، 9 ، 10 ، 11] است. اجازه دهید کل نسخه های همگام سازی شده 2 و اندازه دسته جهانی 4 باشد.

    • کارگر 0:
    • دسته 1 = ماکت 1: [0 ، 1]
    • دسته 2 = ماکت 1: [2 ، 3]
    • دسته 3 = ماکت 1: [4]
    • دسته 4 = ماکت 1: [5]
    • کارگر 1:
    • دسته 1 = ماکت 2: [6 ، 7]
    • دسته 2 = ماکت 2: [8 ، 9]
    • دسته 3 = ماکت 2: [10]
    • دسته 4 = ماکت 2: [11]
  • داده ها: با این کار عناصر موجود در تمام کارگران از بین می روند. هر یک از کارگران کل مجموعه داده را می خوانند و فقط خرده ریز اختصاص داده شده به آن را پردازش می کنند. همه خرده ریزهای دیگر کنار گذاشته می شوند. این معمولاً درصورتی استفاده می شود که تعداد پرونده های ورودی کمتر از تعداد کارگران باشد و شما می خواهید داده های بیشتری را در همه کارگران تقسیم کنید. نکته منفی این است که کل مجموعه داده برای هر کارگر خوانده می شود. به عنوان مثال ، اجازه دهید 1 پرونده را روی 2 کارگر توزیع کنیم. پرونده 1 شامل [0 ، 1 ، 2 ، 3 ، 4 ، 5 ، 6 ، 7 ، 8 ، 9 ، 10 ، 11] است. اجازه دهید تعداد کل ماکت های همگام سازی شده 2 باشد.

    • کارگر 0:
    • دسته 1 = ماکت 1: [0 ، 1]
    • دسته 2 = ماکت 1: [4 ، 5]
    • دسته 3 = ماکت 1: [8 ، 9]
    • کارگر 1:
    • دسته 1 = ماکت 2: [2 ، 3]
    • دسته 2 = ماکت 2: [6 ، 7]
    • دسته 3 = ماکت 2: [10 ، 11]
  • خاموش: اگر سختکاری خودکار را خاموش کنید ، هر کارگر تمام داده ها را پردازش می کند. به عنوان مثال ، اجازه دهید 1 پرونده را روی 2 کارگر توزیع کنیم. پرونده 1 شامل [0 ، 1 ، 2 ، 3 ، 4 ، 5 ، 6 ، 7 ، 8 ، 9 ، 10 ، 11] است. بگذارید تعداد کل ماکت های همگام سازی شده 2 باشد. سپس هر کارگر توزیع زیر را مشاهده می کند:

    • کارگر 0:
    • دسته 1 = ماکت 1: [0 ، 1]
    • دسته 2 = ماکت 1: [2 ، 3]
    • دسته 3 = ماکت 1: [4 ، 5]
    • دسته 4 = ماکت 1: [6 ، 7]
    • دسته 5 = ماکت 1: [8 ، 9]
    • دسته 6 = ماکت 1: [10 ، 11]

    • کارگر 1:

    • دسته 1 = ماکت 2: [0 ، 1]

    • دسته 2 = ماکت 2: [2 ، 3]

    • دسته 3 = ماکت 2: [4 ، 5]

    • دسته 4 = ماکت 2: [6 ، 7]

    • دسته 5 = ماکت 2: [8 ، 9]

    • دسته 6 = ماکت 2: [10 ، 11]

واکشی

به طور پیش فرض ، tf.distribute در انتهای کاربر ارائه شده نمونهtf.data.Dataset ، یک تغییر شکل پیش tf.distribute اضافه می کند. آرگومان تحول پیش فرض که اندازه buffer_size است با تعداد نسخه های همگام سازی برابر است.

tf.distribute.Strategy.distribute_datasets_from_function

استفاده

این API یک تابع ورودی را می گیرد و نمونه ای از tf.distribute.DistributedDataset برمی گرداند. تابع ورودی که کاربران در آن وارد می شوند دارای آرگومان tf.distribute.InputContext است و باید یک نمونهtf.data.Dataset . با این API ، tf.distribute تغییر دیگری در نمونهtf.data.Dataset کاربر که از عملکرد ورودی برگردانده شده ایجاد نمی کند. این وظیفه کاربر است که مجموعه داده ها را خرد و خرد کند. tf.distribute عملکرد ورودی دستگاه CPU هر یک از کارگران را فراخوانی می کند. این API جدا از اینکه به کاربران اجازه می دهد منطق دسته بندی و تقسیم بندی خود را مشخص کنند ، مقیاس پذیری و عملکرد بهتری را نیز در مقایسه با tf.distribute.Strategy.experimental_distribute_dataset هنگام استفاده برای آموزش چند کاره نشان می دهد.

mirrored_strategy = tf.distribute.MirroredStrategy()

def dataset_fn(input_context):
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)
  dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
  dataset = dataset.shard(
    input_context.num_input_pipelines, input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2) # This prefetches 2 batches per device.
  return dataset

dist_dataset = mirrored_strategy.distribute_datasets_from_function(dataset_fn)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

خواص

دسته بندی

نمونهtf.data.Dataset که مقدار برگشتی تابع ورودی است باید با استفاده از اندازه دسته ای هر ماکت دسته بندی شود. اندازه دسته هر ماکت ، اندازه دسته جهانی است که به تعداد نسخه هایی که در آموزش همگام سازی شرکت می کنند تقسیم می شود. این به این دلیل است که tf.distribute عملکرد ورودی دستگاه CPU هر یک از کارگران را فراخوانی می کند. مجموعه داده ای که روی یک کارگر مشخص ایجاد می شود باید برای همه نسخه های موجود در آن کارگر آماده باشد.

خرد کردن

شی tf.distribute.InputContext که به طور ضمنی به عنوان آرگومان به عملکرد ورودی کاربر منتقل می شود توسط tf.distribute در زیر کاپوت ایجاد می شود. این اطلاعات در مورد تعداد کارگران ، شناسه کارگر فعلی و غیره دارد. این تابع ورودی می تواند خرد کردن را طبق سیاست های تعیین شده توسط کاربر با استفاده از این ویژگی ها که بخشی از شی tf.distribute.InputContext دهد.

واکشی

tf.distribute در انتهایtf.data.Dataset که توسط کاربر تابع ورودی ارائه شده است ، یک تغییر پیش فرض اضافه نمی کند.

تکرار کنندگان توزیع شده

مشابه موارد غیر توزیع شدهtf.data.Dataset ، برای تکرار روی آن و دسترسی به عناصر موجود در tf.distribute.DistributedDataset ، باید یک تکرار کننده در tf.distribute.DistributedDataset ایجاد کنید. در زیر روشهایی وجود دارد که می توانید tf.distribute.DistributedIterator ایجاد tf.distribute.DistributedIterator و از آن برای آموزش مدل خود استفاده کنید:

موارد استفاده

برای ساخت حلقه از Pythonic استفاده کنید

برای تکرار در tf.distribute.DistributedDataset می توانید از یک حلقه کاربر پسند Pythonic استفاده کنید. عناصر برگشتی از tf.distribute.DistributedIterator می تواند یک tf.Tensor یا tf.distribute.DistributedValues که حاوی یک مقدار در هر نسخه است. قرار دادن حلقه در داخل یک tf.function باعث افزایش عملکرد می شود. با این حال ، break و return در حال حاضر برای حلقه ای روی tf.distribute.DistributedDataset که در داخل یک tf.function . قرار داده شده پشتیبانی نمی شود.

global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

@tf.function
def train_step(inputs):
  features, labels = inputs
  return labels - 0.3 * features

for x in dist_dataset:
  # train_step trains the model using the dataset elements
  loss = mirrored_strategy.run(train_step, args=(x,))
  print("Loss is ", loss)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)

از iter برای ایجاد یک تکرار کننده صریح استفاده کنید

برای تکرار عناصر موجود در نمونه tf.distribute.DistributedDataset ، می توانید یک tf.distribute.DistributedIterator با استفاده از API iter روی آن ایجاد کنید. با یک تکرار کننده صریح ، می توانید برای تعداد مشخصی از مراحل تکرار کنید. برای بدست آوردن عنصر بعدی از tf.distribute.DistributedIterator عنوان مثال dist_iterator می توانید با next(dist_iterator) ، dist_iterator.get_next() یا dist_iterator.get_next_as_optional() بگیرید. دو مورد قبلی اساساً یکسان هستند:

num_epochs = 10
steps_per_epoch = 5
for epoch in range(num_epochs):
  dist_iterator = iter(dist_dataset)
  for step in range(steps_per_epoch):
    # train_step trains the model using the dataset elements
    loss = mirrored_strategy.run(train_step, args=(next(dist_iterator),))
    # which is the same as
    # loss = mirrored_strategy.run(train_step, args=(dist_iterator.get_next(),))
    print("Loss is ", loss)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)

اگر tf.distribute.DistributedIterator به پایان خود رسیده باشد ، با next() یا tf.distribute.DistributedIterator.get_next() ، یک خطای OutOfRange ظاهر می شود. مشتری می تواند خطا را در سمت پایتون بگیرد و به کارهای دیگر مانند بازرسی و ارزیابی ادامه دهد. با این حال ، اگر از حلقه آموزش میزبان استفاده می کنید (به عنوان مثال چندین مرحله در هر tf.function ) این کار نمی کند ، که به نظر می رسد:

@tf.function
def train_fn(iterator):
  for _ in tf.range(steps_per_loop):
    strategy.run(step_fn, args=(next(iterator),))

train_fn شامل چندین مرحله را طی قرار دادن بدن مرحله در داخل یک tf.range . در این حالت ، تکرارهای مختلف در حلقه و بدون وابستگی می توانند به طور موازی شروع شوند ، بنابراین یک خطای OutOfRange می تواند در تکرارهای بعدی قبل از پایان محاسبه تکرارهای قبلی ایجاد شود. هنگامی که خطای OutOfRange پرتاب شد ، همه گزینه های عملکرد بلافاصله خاتمه می یابند. اگر این موردی است که می خواهید از آن اجتناب کنید ، گزینه دیگری که خطای OutOfRange را ایجاد نمی کند tf.distribute.DistributedIterator.get_next_as_optional() . get_next_as_optional بازده یک tf.experimental.Optional که شامل عنصر بعدی و یا هیچ ارزش اگر tf.distribute.DistributedIterator به پایان رسیده است.

# You can break the loop with get_next_as_optional by checking if the Optional contains value
global_batch_size = 4
steps_per_loop = 5
strategy = tf.distribute.MirroredStrategy(devices=["GPU:0", "CPU:0"])

dataset = tf.data.Dataset.range(9).batch(global_batch_size)
distributed_iterator = iter(strategy.experimental_distribute_dataset(dataset))

@tf.function
def train_fn(distributed_iterator):
  for _ in tf.range(steps_per_loop):
    optional_data = distributed_iterator.get_next_as_optional()
    if not optional_data.has_value():
      break
    per_replica_results = strategy.run(lambda x:x, args=(optional_data.get_value(),))
    tf.print(strategy.experimental_local_results(per_replica_results))
train_fn(distributed_iterator)
WARNING:tensorflow:There are non-GPU devices in `tf.distribute.Strategy`, not using nccl allreduce.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:CPU:0')
([0 1], [2 3])
([4 5], [6 7])
([8], [])

استفاده از خاصیت element_spec

اگر شما از عناصر یک مجموعه داده های توزیع شده به tf.function و می خواهید یک tf.TypeSpec تضمین، شما می توانید مشخص کنید input_signature استدلال از tf.function . خروجی یک مجموعه داده توزیع شده tf.distribute.DistributedValues که می تواند ورودی یک دستگاه یا چندین دستگاه را نشان دهد. برای بدست آوردن tf.TypeSpec مربوط به این مقدار توزیع شده می توانید از خاصیت element_spec مجموعه داده توزیع شده یا شی تکرار کننده توزیع شده استفاده کنید.

global_batch_size = 16
epochs = 5
steps_per_epoch = 5
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

@tf.function(input_signature=[dist_dataset.element_spec])
def train_step(per_replica_inputs):
  def step_fn(inputs):
    return 2 * inputs

  return mirrored_strategy.run(step_fn, args=(per_replica_inputs,))

for _ in range(epochs):
  iterator = iter(dist_dataset)
  for _ in range(steps_per_epoch):
    output = train_step(next(iterator))
    tf.print(output)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])

دسته های جزئی

هنگامی که مواردtf.data.Dataset که کاربران ایجاد می کنند ممکن است دارای اندازه های دسته ای باشند که به طور مساوی بر تعداد نسخه ها قابل تقسیم نیستند یا زمانی که ویژگی اصلی نمونه مجموعه داده بر اساس اندازه دسته قابل تقسیم نیست ، دسته های جزئی مشاهده می شود. این بدان معناست که وقتی مجموعه داده روی چندین نسخه کپی توزیع شود ، تماس next با برخی از تکرار کنندگان منجر به ایجاد خطای OutOfRangeError خواهد شد. برای رسیدگی به این مورد استفاده ، tf.distribute دسته های ساختگی از اندازه دسته 0 را بر روی ماکتهایی که اطلاعات بیشتری برای پردازش tf.distribute ، برمی گرداند.

در مورد تک کارگر ، اگر داده ها با تماس next در تکرار کننده بازگردانده نشوند ، دسته های ساختگی 0 دسته ای ایجاد می شوند و همراه با داده های واقعی در مجموعه داده استفاده می شوند. در مورد دسته های جزئی ، آخرین دسته داده جهانی شامل داده های واقعی در کنار دسته های ساختگی داده ها خواهد بود. شرط توقف پردازش داده ها اکنون بررسی می کند که آیا هر یک از نسخه ها داده دارند. اگر هیچ داده ای در مورد هر نسخه وجود نداشته باشد ، خطای OutOfRange ایجاد می شود.

برای حالت چند کارگر ، مقدار بولی که نمایانگر حضور داده ها در مورد هر یک از کارگران است با استفاده از ارتباطات متقابل متقابل جمع می شود و این برای شناسایی اینکه آیا همه کارگران پردازش مجموعه داده توزیع شده را به پایان رسانده اند استفاده می شود. از آنجا که این ارتباطات متقابل کارگر را شامل می شود ، مجازات عملکردی در بر دارد.

هشدارها

  • هنگام استفاده از API های tf.distribute.Strategy.experimental_distribute_dataset با چندین تنظیم کارگر ، کاربران از یکtf.data.Dataset عبور می کنند که از پرونده ها خوانده می شود. اگر tf.data.experimental.AutoShardPolicy روی AUTO یا FILE ، اندازه دسته واقعی هر مرحله ممکن است کوچکتر از اندازه دسته جهانی تعریف شده توسط کاربر باشد. این می تواند زمانی اتفاق بیفتد که عناصر باقی مانده در پرونده کمتر از اندازه دسته جهانی باشد. کاربران می توانند مجموعه داده ها را بدون اینکه به تعداد مراحل اجرا tf.data.experimental.AutoShardPolicy کنند یا tf.data.experimental.AutoShardPolicy روی DATA تنظیم tf.data.experimental.AutoShardPolicy تا در اطراف آن کار کند.

  • در حال حاضر تغییر شکل داده های داده با tf.distribute پشتیبانی نمی شوند و هرگونه tf.distribute که این مجموعه داده داشته باشد در حال حاضر نادیده گرفته می شود. به عنوان مثال ، اگر مجموعه داده شما دارای یک map_fn که از tf.random.uniform برای چرخاندن یک تصویر استفاده می کند ، در این صورت شما یک نمودار مجموعه داده دارید که به حالت (یعنی بذر تصادفی) دستگاه محلی بستگی دارد که روند پایتون در آن اجرا می شود.

  • tf.data.experimental.OptimizationOptions آزمایشی tf.data.experimental.OptimizationOptions هایی که به طور پیش فرض غیرفعال هستند می توانند در زمینه های خاصی - مانند استفاده همزمان با tf.distribute - باعث tf.distribute عملکرد شوند. فقط پس از اینکه تأیید کردید در عملکرد توزیع کار شما مفید هستند ، باید آنها را فعال کنید.

  • لطفاً برای چگونگی بهینه سازی خط لوله ورودی خود با tf.data به طور کلی به این راهنما مراجعه کنید. چند نکته اضافی:

    • اگر چندین کارگر دارید و از tf.data.Dataset.list_files برای ایجاد یک مجموعه داده از همه پرونده های مطابق با یک یا چند الگوی glob استفاده می کنید ، به یاد داشته باشید که آرگومان seed را تنظیم کنید یا shuffle=False تا هر کارگر پرونده را به طور مداوم خرد کند.

    • اگر خط لوله ورودی شما شامل هم زدن داده ها در سطح رکورد و هم تجزیه داده ها باشد ، مگر اینکه داده های تجزیه شده به طور قابل توجهی بزرگتر از داده های تجزیه شده باشد (که معمولاً چنین نیست) ، ابتدا آنرا مرتب کنید و سپس تجزیه کنید ، همانطور که در مثال زیر نشان داده شده است. این ممکن است به نفع استفاده و عملکرد حافظه باشد.

d = tf.data.Dataset.list_files(pattern, shuffle=False)
d = d.shard(num_workers, worker_index)
d = d.repeat(num_epochs)
d = d.shuffle(shuffle_buffer_size)
d = d.interleave(tf.data.TFRecordDataset,
                 cycle_length=num_readers, block_length=1)
d = d.map(parser_fn, num_parallel_calls=num_map_threads)
  • tf.data.Dataset.shuffle(buffer_size, seed=None, reshuffle_each_iteration=None) بافر داخلی عناصر buffer_size حفظ می کند و بنابراین کاهش buffer_size می تواند مسئله OOM را برطرف کند.

  • ترتیب پردازش داده ها توسط کارگران هنگام استفاده از tf.distribute.experimental_distribute_dataset یا tf.distribute.distribute_datasets_from_function تضمین نمی شود. اگر از tf.distribute برای پیش بینی مقیاس استفاده می کنید ، این معمولاً لازم است. با این وجود می توانید برای هر عنصر یک شاخص را وارد کنید و خروجی ها را بر این اساس سفارش دهید. قطعه زیر نمونه ای از نحوه سفارش خروجی ها است.

mirrored_strategy = tf.distribute.MirroredStrategy()
dataset_size = 24
batch_size = 6
dataset = tf.data.Dataset.range(dataset_size).enumerate().batch(batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

def predict(index, inputs):
  outputs = 2 * inputs
  return index, outputs

result = {}
for index, inputs in dist_dataset:
  output_index, outputs = mirrored_strategy.run(predict, args=(index, inputs))
  indices = list(mirrored_strategy.experimental_local_results(output_index))
  rindices = []
  for a in indices:
    rindices.extend(a.numpy())
  outputs = list(mirrored_strategy.experimental_local_results(outputs))
  routputs = []
  for a in outputs:
    routputs.extend(a.numpy())
  for i, value in zip(rindices, routputs):
    result[i] = value

print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
{0: 0, 1: 2, 2: 4, 3: 6, 4: 8, 5: 10, 6: 12, 7: 14, 8: 16, 9: 18, 10: 20, 11: 22, 12: 24, 13: 26, 14: 28, 15: 30, 16: 32, 17: 34, 18: 36, 19: 38, 20: 40, 21: 42, 22: 44, 23: 46}

اگر از نمونه متعارف tf.data.Dataset استفاده نکنم ، چگونه می توانم داده های خود را توزیع کنم؟

بعضی اوقات کاربران نمی توانند از یکtf.data.Dataset برای نشان دادن ورودی خود و متعاقباً API های فوق الذکر برای توزیع مجموعه داده به چندین دستگاه استفاده کنند. در چنین مواردی می توانید از سنسورهای خام یا ورودی های یک ژنراتور استفاده کنید.

برای ورودی های تانسور دلخواه از تجربی_توزیع_مقادیر_ از_عمل استفاده کنید

strategy.run tf.distribute.DistributedValues که خروجی next(iterator) strategy.run می پذیرد. به تصویب ارزش تانسور، استفاده از experimental_distribute_values_from_function به ساختار tf.distribute.DistributedValues از تانسورها خام.

mirrored_strategy = tf.distribute.MirroredStrategy()
worker_devices = mirrored_strategy.extended.worker_devices

def value_fn(ctx):
  return tf.constant(1.0)

distributed_values = mirrored_strategy.experimental_distribute_values_from_function(value_fn)
for _ in range(4):
  result = mirrored_strategy.run(lambda x:x, args=(distributed_values,))
  print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)

اگر ورودی شما از یک مولد است از tf.data.Dataset.from_generator استفاده کنید

اگر یک عملکرد ژنراتور دارید که می خواهید استفاده کنید ، می توانید با استفاده از API from_generator یک نمونهtf.data.Dataset ایجاد کنید.

mirrored_strategy = tf.distribute.MirroredStrategy()
def input_gen():
  while True:
    yield np.random.rand(4)

# use Dataset.from_generator
dataset = tf.data.Dataset.from_generator(
    input_gen, output_types=(tf.float32), output_shapes=tf.TensorShape([4]))
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
iterator = iter(dist_dataset)
for _ in range(4):
  mirrored_strategy.run(lambda x:x, args=(next(iterator),))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)