کمک به حفاظت از دیواره بزرگ مرجانی با TensorFlow در Kaggle اضافه کردن چالش

ورودی توزیع شده

مشاهده در TensorFlow.org در Google Colab اجرا شود مشاهده منبع در GitHub دانلود دفترچه یادداشت

tf.distribute رابط های برنامه کاربردی یک راه آسان برای کاربران به مقیاس های آموزشی خود را از یک ماشین به ماشین آلات مختلف فراهم می کند. هنگام مقیاس بندی مدل خود، کاربران همچنین باید ورودی خود را در چندین دستگاه توزیع کنند. tf.distribute فراهم می کند رابط های برنامه کاربردی با استفاده از آن میتوانید به صورت خودکار ورودی خود را در سراسر دستگاه های توزیع کنید.

این راهنما به شما در راه های مختلف که در آن شما می توانید مجموعه داده توزیع شده و تکرارکننده با استفاده از ایجاد نشان tf.distribute رابط های برنامه کاربردی. علاوه بر این، موضوعات زیر پوشش داده خواهد شد:

این راهنما استفاده از ورودی توزیع شده با Keras API را پوشش نمی دهد.

مجموعه داده های توزیع شده

برای استفاده از tf.distribute API های به مقیاس، توصیه می شود که کاربران با استفاده از tf.data.Dataset برای نشان ورودی خود را. tf.distribute ساخته شده است برای کار موثر با tf.data.Dataset (به عنوان مثال، واکشی اولیه خودکار از اطلاعات بر روی هر یک از دستگاه شتاب دهنده) با بهینه سازی عملکرد به طور منظم به اجرای گنجانیده شده است. اگر شما یک مورد استفاده برای استفاده از چیزی به غیر از tf.data.Dataset ، لطفا ارجاع به یک بعد بخش در این راهنما. در غیر توزیع حلقه آموزش، کاربران برای اولین بار یک ایجاد tf.data.Dataset مثال و سپس تکرار بیش از عناصر. مثلا:

import tensorflow as tf

# Helper libraries
import numpy as np
import os

print(tf.__version__)
2.5.0
global_batch_size = 16
# Create a tf.data.Dataset object.
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)

@tf.function
def train_step(inputs):
  features, labels = inputs
  return labels - 0.3 * features

# Iterate over the dataset using the for..in construct.
for inputs in dataset:
  print(train_step(inputs))
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)

به کاربران اجازه استفاده از tf.distribute استراتژی با کمترین تغییرات در کد موجود کاربر، دو API معرفی شدند که می تواند یک توزیع tf.data.Dataset مثال و بازگشت یک مجموعه داده شی توزیع شده است. سپس یک کاربر می تواند روی این نمونه داده توزیع شده تکرار کند و مدل خود را مانند قبل آموزش دهد. اجازه دهید ما در حال حاضر در دو API نگاه - tf.distribute.Strategy.experimental_distribute_dataset و tf.distribute.Strategy.distribute_datasets_from_function با جزئیات بیشتر:

tf.distribute.Strategy.experimental_distribute_dataset

استفاده

این API طول می کشد یک tf.data.Dataset عنوان مثال به عنوان ورودی و یک گرداند tf.distribute.DistributedDataset به عنوان مثال. باید مجموعه داده ورودی را با مقداری برابر با اندازه دسته کلی دسته بندی کنید. این اندازه دسته کلی تعداد نمونه‌هایی است که می‌خواهید در همه دستگاه‌ها در 1 مرحله پردازش کنید. شما می توانید این مجموعه داده توزیع شده در یک مد پایتونی تکرار بیش از و یا ایجاد یک تکرارکننده با استفاده از iter . هدف بازگشت است نه tf.data.Dataset نمونه و هیچ رابط های برنامه کاربردی دیگر که تبدیل و یا بازرسی مجموعه داده را در هر راه را پشتیبانی نمی کند. اگر راه‌های خاصی ندارید که بخواهید ورودی خود را روی کپی‌های مختلف تقسیم کنید، این API توصیه‌شده است.

global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
# Distribute input using the `experimental_distribute_dataset`.
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
# 1 global batch of data fed to the model in 1 step.
print(next(iter(dist_dataset)))
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
(<tf.Tensor: shape=(16, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>, <tf.Tensor: shape=(16, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>)

خواص

دسته بندی

tf.distribute rebatches ورودی tf.data.Dataset عنوان مثال با اندازه دسته ای جدید است که به اندازه دسته ای جهانی تقسیم شده توسط تعدادی از کپی در هماهنگی برابر است. تعداد کپی‌های همگام‌سازی شده برابر است با تعداد دستگاه‌هایی که در گرادیان شرکت می‌کنند و در طول آموزش کاهش می‌یابند. هنگامی که یک کاربر می نامد next در تکرارکننده، هر ماکت اندازه دسته ای از داده ها در هر ماکت بازگشت توزیع شده است. کاردینالیته مجموعه داده مجدداً چند برابری از تعداد تکرارها خواهد بود. در اینجا چند نمونه وجود دارد:

  • tf.data.Dataset.range(6).batch(4, drop_remainder=False)

    • بدون توزیع:
    • دسته 1: [0، 1، 2، 3]
    • دسته 2: [4، 5]
    • با توزیع بیش از 2 ماکت. آخرین دسته ([4، 5]) بین 2 کپی تقسیم می شود.

    • دسته 1:

      • Replica 1:[0, 1]
      • Replica 2:[2, 3]
    • دسته 2:

      • ماکت 2: [4]
      • ماکت 2: [5]
  • tf.data.Dataset.range(4).batch(4)

    • بدون توزیع:
    • دسته 1: [[0]، [1]، [2]، [3]]
    • با توزیع بیش از 5 کپی:
    • دسته 1:
      • ماکت 1: [0]
      • ماکت 2: [1]
      • ماکت 3: [2]
      • ماکت 4: [3]
      • ماکت 5: []
  • tf.data.Dataset.range(8).batch(4)

    • بدون توزیع:
    • دسته 1: [0، 1، 2، 3]
    • دسته 2: [4، 5، 6، 7]
    • با توزیع بیش از 3 کپی:
    • دسته 1:
      • ماکت 1: [0، 1]
      • ماکت 2: [2، 3]
      • ماکت 3: []
    • دسته 2:
      • ماکت 1: [4، 5]
      • ماکت 2: [6، 7]
      • ماکت 3: []

بازگردانی مجموعه داده دارای پیچیدگی فضایی است که به صورت خطی با تعداد تکرارها افزایش می یابد. این بدان معنی است که برای استفاده از آموزش چند کارگری، خط لوله ورودی می تواند با خطاهای OOM مواجه شود.

شاردینگ

tf.distribute همچنین autoshards مجموعه داده های ورودی در آموزش کارگران چند با MultiWorkerMirroredStrategy و TPUStrategy . هر مجموعه داده بر روی دستگاه CPU کارگر ایجاد می شود. Autosharding یک مجموعه داده بیش از یک مجموعه از وسایل کارگران که هر کارگر تخصیص داده شده است یک زیر مجموعه از کل مجموعه داده (اگر حق tf.data.experimental.AutoShardPolicy تنظیم شده است). این برای اطمینان از این است که در هر مرحله، یک اندازه دسته کلی از عناصر داده غیرهمپوشانی توسط هر کارگر پردازش می‌شود. Autosharding یک زن و شوهر از گزینه های مختلف است که می تواند با استفاده از مشخص tf.data.experimental.DistributeOptions . توجه داشته باشید این است که هیچ autosharding در عرصه آموزش کارمندان چند با وجود ParameterServerStrategy ، و کسب اطلاعات بیشتر در مجموعه داده ایجاد با این استراتژی می توان در یافت آموزش پارامتر سرور استراتژی .

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
dataset = dataset.with_options(options)

سه گزینه مختلف است که شما می توانید برای مجموعه وجود دارد tf.data.experimental.AutoShardPolicy :

  • AUTO: این گزینه پیش فرض است که به این معنی است که سعی می شود توسط FILE به اشتراک گذاشته شود. اگر یک مجموعه داده مبتنی بر فایل شناسایی نشود، تلاش برای خرد کردن توسط FILE با شکست مواجه می‌شود. tf.distribute سپس سقوط خواهد کرد به sharding توسط داده. توجه کنید که اگر مجموعه داده ورودی مبتنی بر فایل است اما تعدادی از فایل های کمتر از تعداد کارگران است، یک InvalidArgumentError مطرح خواهد شد. در این صورت، به صراحت تعیین سیاست به AutoShardPolicy.DATA ، و یا منبع ورودی خود را به فایلهای کوچکتر تقسیم به طوری که تعدادی از فایل های بیشتر از تعداد کارگران است.
  • FILE: این گزینه ای است که می خواهید فایل های ورودی را روی همه کارگران خرد کنید. اگر تعداد فایل های ورودی بسیار بیشتر از تعداد کارگران است و داده های موجود در فایل ها به طور مساوی توزیع شده اند، باید از این گزینه استفاده کنید. نقطه ضعف این گزینه داشتن کارگران بیکار است در صورتی که داده های موجود در فایل ها به طور یکنواخت توزیع نشده باشند. اگر تعداد فایل های کمتر از تعداد کارگران است، یک InvalidArgumentError مطرح خواهد شد. اگر این اتفاق می افتد، به صراحت سیاست به مجموعه ای AutoShardPolicy.DATA . به عنوان مثال، اجازه دهید 2 فایل را روی 2 کارگر با 1 ماکت توزیع کنیم. فایل 1 شامل [0، 1، 2، 3، 4، 5] و فایل 2 حاوی [6، 7، 8، 9، 10، 11] است. اجازه دهید تعداد کل کپی ها در همگام سازی 2 و اندازه دسته جهانی 4 باشد.

    • کارگر 0:
    • دسته 1 = ماکت 1: [0، 1]
    • دسته 2 = ماکت 1: [2، 3]
    • دسته 3 = ماکت 1: [4]
    • دسته 4 = ماکت 1: [5]
    • کارگر 1:
    • دسته 1 = ماکت 2: [6، 7]
    • دسته 2 = ماکت 2: [8، 9]
    • دسته 3 = ماکت 2: [10]
    • دسته 4 = ماکت 2: [11]
  • DATA: این کار عناصر را در همه کارگران به صورت خودکار خرد می کند. هر یک از کارگران کل مجموعه داده را می خوانند و فقط قطعه اختصاص داده شده به آن را پردازش می کنند. تمام خرده های دیگر دور ریخته خواهند شد. این معمولاً در صورتی استفاده می شود که تعداد فایل های ورودی کمتر از تعداد کارگران باشد و به اشتراک گذاری بهتر داده ها در همه کارگران بخواهید. نکته منفی این است که کل مجموعه داده در هر کارگر خوانده می شود. به عنوان مثال، اجازه دهید 1 فایل را روی 2 کارگر توزیع کنیم. فایل 1 شامل [0، 1، 2، 3، 4، 5، 6، 7، 8، 9، 10، 11] است. تعداد کل کپی های همگام شده را 2 عدد بگذارید.

    • کارگر 0:
    • دسته 1 = ماکت 1: [0، 1]
    • دسته 2 = ماکت 1: [4، 5]
    • دسته 3 = ماکت 1: [8، 9]
    • کارگر 1:
    • دسته 1 = ماکت 2: [2، 3]
    • دسته 2 = ماکت 2: [6، 7]
    • دسته 3 = ماکت 2: [10، 11]
  • OFF: اگر Autosharding را خاموش کنید، هر کارگر تمام داده ها را پردازش می کند. به عنوان مثال، اجازه دهید 1 فایل را روی 2 کارگر توزیع کنیم. فایل 1 شامل [0، 1، 2، 3، 4، 5، 6، 7، 8، 9، 10، 11] است. اجازه دهید تعداد کل کپی‌های همگام ۲ باشد. سپس هر کارگر توزیع زیر را می‌بیند:

    • کارگر 0:
    • دسته 1 = ماکت 1: [0، 1]
    • دسته 2 = ماکت 1: [2، 3]
    • دسته 3 = ماکت 1: [4، 5]
    • دسته 4 = ماکت 1: [6، 7]
    • دسته 5 = ماکت 1: [8، 9]
    • دسته 6 = ماکت 1: [10، 11]

    • کارگر 1:

    • دسته 1 = ماکت 2: [0، 1]

    • دسته 2 = ماکت 2: [2، 3]

    • دسته 3 = ماکت 2: [4، 5]

    • دسته 4 = ماکت 2: [6، 7]

    • دسته 5 = ماکت 2: [8، 9]

    • دسته 6 = ماکت 2: [10، 11]

پیش واکشی

به طور پیش فرض، tf.distribute می افزاید: یک تحول واکشی اولیه در پایان کاربر ارائه tf.data.Dataset به عنوان مثال. استدلال به تغییر و تحول واکشی اولیه است که buffer_size به تعداد کپی در هماهنگی برابر است.

tf.distribute.Strategy.distribute_datasets_from_function

استفاده

این API طول می کشد یک تابع ورودی و یک گرداند tf.distribute.DistributedDataset به عنوان مثال. تابع ورودی که کاربران در عبور است tf.distribute.InputContext استدلال و باید یک بازگشت tf.data.Dataset به عنوان مثال. با استفاده از این API، tf.distribute هیچ تغییرات بیشتر به کاربر را tf.data.Dataset بازگشت به عنوان مثال از تابع ورودی. مسئولیت دسته بندی و خرد کردن مجموعه داده ها بر عهده کاربر است. tf.distribute تابع ورودی بر روی دستگاه CPU از هر یک از کارگران می نامد. به غیر از کاربران اجازه می دهد مشخص خود بچینگ و sharding منطق خود را، این API نیز نشان می دهد مقیاس پذیری و عملکرد بهتر نسبت به tf.distribute.Strategy.experimental_distribute_dataset زمانی که برای آموزش چند کارگر استفاده می شود.

mirrored_strategy = tf.distribute.MirroredStrategy()

def dataset_fn(input_context):
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)
  dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
  dataset = dataset.shard(
    input_context.num_input_pipelines, input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2) # This prefetches 2 batches per device.
  return dataset

dist_dataset = mirrored_strategy.distribute_datasets_from_function(dataset_fn)
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

خواص

دسته بندی

tf.data.Dataset عنوان مثال این است که مقدار بازگشتی این تابع ورودی باید با استفاده از هر ماکت اندازه دسته بسته بندی های کوچک. اندازه دسته ای هر ماکت، اندازه دسته جهانی تقسیم بر تعداد کپی هایی است که در آموزش همگام سازی شرکت می کنند. دلیل این است که tf.distribute تابع ورودی بر روی دستگاه CPU از هر یک از کارگران می نامد. مجموعه داده ای که روی یک کارگر معین ایجاد می شود باید برای استفاده توسط همه کپی های آن کارگر آماده باشد.

شاردینگ

tf.distribute.InputContext شی است که به طور ضمنی به عنوان یک آرگومان به تابع ورودی کاربر به تصویب می رسد با ایجاد tf.distribute در زیر کاپوت. این اطلاعات در مورد تعداد کارگران، شناسه کارگر فعلی و غیره این تابع ورودی را می sharding به عنوان در هر سیاست تعیین شده توسط کاربر با استفاده از این خواص است که بخشی از مسئولیت رسیدگی به tf.distribute.InputContext شی.

پیش واکشی

tf.distribute یک تحول واکشی اولیه در پایان اضافه کنید از tf.data.Dataset توسط کاربر ارائه تابع ورودی بازگشت.

تکرار کننده های توزیع شده

مشابه به غیر توزیع tf.data.Dataset موارد، شما نیاز به ایجاد یک تکرارکننده در tf.distribute.DistributedDataset موارد به تکرار بیش از آن و دسترسی عناصر در tf.distribute.DistributedDataset . موارد زیر راه هایی که شما می توانید یک ایجاد می tf.distribute.DistributedIterator و استفاده از آن برای آموزش مدل شما:

موارد استفاده

از ساختار حلقه پایتونیک برای حلقه استفاده کنید

شما می توانید یک کاربر پسند حلقه پایتونی به تکرار بیش از استفاده از tf.distribute.DistributedDataset . عناصر بازگشت از tf.distribute.DistributedIterator می تواند یک tf.Tensor یا tf.distribute.DistributedValues که شامل یک مقدار در هر ماکت. قرار دادن حلقه در داخل یک tf.function خواهد افزایش عملکرد است. با این حال، break و return در حال حاضر برای یک حلقه بیش از یک پشتیبانی نمی tf.distribute.DistributedDataset است که در داخل یک قرار داده tf.function .

global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

@tf.function
def train_step(inputs):
  features, labels = inputs
  return labels - 0.3 * features

for x in dist_dataset:
  # train_step trains the model using the dataset elements
  loss = mirrored_strategy.run(train_step, args=(x,))
  print("Loss is ", loss)
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)

استفاده از iter برای ایجاد یک تکرارکننده صریح

به تکرار بیش از عناصر در یک tf.distribute.DistributedDataset عنوان مثال، شما می توانید یک tf.distribute.DistributedIterator با استفاده از iter API بر روی آن. با یک تکرار کننده صریح، می توانید برای تعداد ثابتی از مراحل تکرار کنید. به منظور دریافت عنصر بعدی از tf.distribute.DistributedIterator مثال dist_iterator ، شما می توانید پاسخ next(dist_iterator) ، dist_iterator.get_next() ، و یا dist_iterator.get_next_as_optional() . دو مورد قبلی در اصل یکسان هستند:

num_epochs = 10
steps_per_epoch = 5
for epoch in range(num_epochs):
  dist_iterator = iter(dist_dataset)
  for step in range(steps_per_epoch):
    # train_step trains the model using the dataset elements
    loss = mirrored_strategy.run(train_step, args=(next(dist_iterator),))
    # which is the same as
    # loss = mirrored_strategy.run(train_step, args=(dist_iterator.get_next(),))
    print("Loss is ", loss)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)

با next() و یا tf.distribute.DistributedIterator.get_next() ، اگر tf.distribute.DistributedIterator به پایان رسیده است، یک خطا OutOfRange رخ می دهد. مشتری می تواند خطا را در سمت پایتون بگیرد و به انجام کارهای دیگر مانند چک پوینت و ارزیابی ادامه دهد. با این حال، این کار نخواهد کرد اگر شما با استفاده از یک حلقه آموزش میزبان (به عنوان مثال، اجرا مراحل متعدد در هر tf.function )، که به نظر میرسد:

@tf.function
def train_fn(iterator):
  for _ in tf.range(steps_per_loop):
    strategy.run(step_fn, args=(next(iterator),))

train_fn شامل چندین مرحله را طی قرار دادن بدن مرحله در داخل یک tf.range . در این مورد، تکرارهای مختلف در حلقه بدون وابستگی می‌توانند به صورت موازی شروع شوند، بنابراین یک خطای OutOfRange می‌تواند در تکرارهای بعدی قبل از اتمام محاسبه تکرارهای قبلی، ایجاد شود. هنگامی که یک خطای OutOfRange پرتاب می شود، تمام عملیات های تابع فوراً خاتمه می یابد. اگر این برخی از مورد که شما می خواهم برای جلوگیری از است، یک جایگزین می کند که یک خطای OutOfRange پرتاب نمی باشد tf.distribute.DistributedIterator.get_next_as_optional() . get_next_as_optional بازده یک tf.experimental.Optional که شامل عنصر بعدی و یا هیچ ارزش اگر tf.distribute.DistributedIterator به پایان رسیده است.

# You can break the loop with get_next_as_optional by checking if the Optional contains value
global_batch_size = 4
steps_per_loop = 5
strategy = tf.distribute.MirroredStrategy(devices=["GPU:0", "CPU:0"])

dataset = tf.data.Dataset.range(9).batch(global_batch_size)
distributed_iterator = iter(strategy.experimental_distribute_dataset(dataset))

@tf.function
def train_fn(distributed_iterator):
  for _ in tf.range(steps_per_loop):
    optional_data = distributed_iterator.get_next_as_optional()
    if not optional_data.has_value():
      break
    per_replica_results = strategy.run(lambda x:x, args=(optional_data.get_value(),))
    tf.print(strategy.experimental_local_results(per_replica_results))
train_fn(distributed_iterator)
WARNING:tensorflow:There are non-GPU devices in `tf.distribute.Strategy`, not using nccl allreduce.
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:CPU:0')
([0 1], [2 3])
([4 5], [6 7])
([8], [])

با استفاده از element_spec اموال

اگر شما از عناصر یک مجموعه داده های توزیع شده به tf.function و می خواهید یک tf.TypeSpec تضمین، شما می توانید مشخص کنید input_signature استدلال از tf.function . خروجی یک مجموعه داده توزیع شده است tf.distribute.DistributedValues که می تواند ورودی به یک دستگاه واحد یا دستگاه های متعدد را نشان دهد. برای به دست آوردن tf.TypeSpec مربوط به این مقدار توزیع شما می توانید با استفاده از element_spec اموال از مجموعه داده های توزیع یا توزیع شی تکرارکننده.

global_batch_size = 16
epochs = 5
steps_per_epoch = 5
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

@tf.function(input_signature=[dist_dataset.element_spec])
def train_step(per_replica_inputs):
  def step_fn(inputs):
    return 2 * inputs

  return mirrored_strategy.run(step_fn, args=(per_replica_inputs,))

for _ in range(epochs):
  iterator = iter(dist_dataset)
  for _ in range(steps_per_epoch):
    output = train_step(next(iterator))
    tf.print(output)
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])

دسته های جزئی

دسته جزئی مواجه می شوند که tf.data.Dataset موارد که کاربران ایجاد ممکن است حاوی اندازه دسته که به طور مساوی شده توسط تعدادی از کپی و یا وقتی که کاردینالیتی عنوان مثال مجموعه داده است توسط بخش اندازه دسته نیست بخش پذیر نیست. به این معنی که زمانی که مجموعه داده است که بیش از کپی های متعدد توزیع شده، next پاسخ در برخی از تکرارکننده در OutOfRangeError منجر خواهد شد. برای رسیدگی به این پرونده، tf.distribute بازده ساختگی دسته از اندازه دسته 0 در کپی که هر گونه اطلاعات بیشتر به فرایند ندارد.

برای مورد کارگر تنها، اگر داده ها توسط برگردانده نمی next پاسخ در تکرارکننده، دسته ساختگی از 0 اندازه دسته تولید می شوند و مورد استفاده قرار همراه با داده های واقعی در مجموعه داده. در مورد دسته‌های جزئی، آخرین دسته جهانی داده‌ها شامل داده‌های واقعی در کنار دسته‌های ساختگی داده‌ها خواهد بود. شرایط توقف برای پردازش داده ها اکنون بررسی می کند که آیا هر یک از ماکت ها داده ای دارند یا خیر. اگر هیچ داده ای در مورد هیچ یک از کپی ها وجود نداشته باشد، یک خطای OutOfRange پرتاب می شود.

برای مورد چند کارگری، مقدار بولی که نشان دهنده حضور داده ها در هر یک از کارگران است با استفاده از ارتباط تکراری متقابل جمع می شود و این برای شناسایی اینکه آیا همه کارگران پردازش مجموعه داده توزیع شده را به پایان رسانده اند استفاده می شود. از آنجایی که این شامل ارتباط متقابل کارگری است، جریمه عملکردی نیز در بر دارد.

هشدارها

  • هنگام استفاده از tf.distribute.Strategy.experimental_distribute_dataset رابط های برنامه کاربردی با راه اندازی کارگر متعدد، کاربران تصویب tf.data.Dataset که از فایل های می خواند. اگر tf.data.experimental.AutoShardPolicy تنظیم شده است AUTO یا FILE ، واقعی در هر مرحله اندازه دسته ممکن است کوچکتر از کاربر تعریف شده اندازه دسته جهانی است. این می تواند زمانی اتفاق بیفتد که عناصر باقی مانده در فایل کمتر از اندازه دسته ای جهانی باشند. کاربران میتوانند مجموعه داده بدون توجه به تعداد مراحل به اجرا و یا مجموعه ای اگزوز tf.data.experimental.AutoShardPolicy به DATA به کار در اطراف آن.

  • تحولات مجموعه داده stateful به در حال حاضر با پشتیبانی نمی tf.distribute و هر گونه عملیات stateful به که مجموعه داده ممکن است در حال حاضر استفاده کنه. برای مثال، اگر شما دارای یک مجموعه داده map_fn که با استفاده از tf.random.uniform به چرخش تصویر، پس از آن شما یک نمودار مجموعه داده است که بستگی به دولت (یعنی دانه تصادفی) در ماشین محلی که در آن فرآیند پایتون در حال اجرا.

  • تجربی tf.data.experimental.OptimizationOptions که به طور پیش فرض می تواند در زمینه های خاصی غیر فعال - مانند زمانی که همراه با استفاده tf.distribute - علت تخریب عملکرد. شما باید آنها را فقط پس از تأیید اعتبار آنها فعال کنید که عملکرد حجم کاری شما در یک تنظیم توزیع مفید است.

  • لطفا به این راهنمای برای چگونگی بهینه سازی خط لوله ورودی خود را با tf.data به طور کلی. چند نکته اضافی:

    • اگر شما کارگران متعدد و با استفاده از tf.data.Dataset.list_files برای ایجاد یک مجموعه داده از تمام فایل های تطبیق یک یا چند الگوهای جانشین، به یاد داشته باشید به مجموعه ای از seed استدلال و یا مجموعه ای shuffle=False به طوری که هر کارگر سفال فایل به طور مداوم.

    • اگر خط لوله ورودی شما شامل مخلوط کردن داده ها در سطح رکورد و تجزیه داده ها می شود، مگر اینکه داده های تجزیه نشده به طور قابل توجهی بزرگتر از داده های تجزیه شده باشد (که معمولاً اینطور نیست)، ابتدا مخلوط کنید و سپس تجزیه کنید، همانطور که در مثال زیر نشان داده شده است. این ممکن است برای استفاده و عملکرد حافظه مفید باشد.

d = tf.data.Dataset.list_files(pattern, shuffle=False)
d = d.shard(num_workers, worker_index)
d = d.repeat(num_epochs)
d = d.shuffle(shuffle_buffer_size)
d = d.interleave(tf.data.TFRecordDataset,
                 cycle_length=num_readers, block_length=1)
d = d.map(parser_fn, num_parallel_calls=num_map_threads)
  • tf.data.Dataset.shuffle(buffer_size, seed=None, reshuffle_each_iteration=None) حفظ یک بافر داخلی از buffer_size عناصر، و در نتیجه کاهش buffer_size می تواند مسئله OOM aleviate.

  • منظور که در آن داده ها توسط کارگران پردازش در هنگام استفاده از tf.distribute.experimental_distribute_dataset یا tf.distribute.distribute_datasets_from_function تضمین شده نیست. این طور معمول لازم است اگر شما با استفاده tf.distribute به پیش بینی مقیاس. با این حال می‌توانید برای هر عنصر در دسته یک شاخص درج کنید و خروجی‌ها را بر اساس آن سفارش دهید. قطعه زیر نمونه ای از نحوه سفارش خروجی ها است.

mirrored_strategy = tf.distribute.MirroredStrategy()
dataset_size = 24
batch_size = 6
dataset = tf.data.Dataset.range(dataset_size).enumerate().batch(batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

def predict(index, inputs):
  outputs = 2 * inputs
  return index, outputs

result = {}
for index, inputs in dist_dataset:
  output_index, outputs = mirrored_strategy.run(predict, args=(index, inputs))
  indices = list(mirrored_strategy.experimental_local_results(output_index))
  rindices = []
  for a in indices:
    rindices.extend(a.numpy())
  outputs = list(mirrored_strategy.experimental_local_results(outputs))
  routputs = []
  for a in outputs:
    routputs.extend(a.numpy())
  for i, value in zip(rindices, routputs):
    result[i] = value

print(result)
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
{0: 0, 1: 2, 2: 4, 3: 6, 4: 8, 5: 10, 6: 12, 7: 14, 8: 16, 9: 18, 10: 20, 11: 22, 12: 24, 13: 26, 14: 28, 15: 30, 16: 32, 17: 34, 18: 36, 19: 38, 20: 40, 21: 42, 22: 44, 23: 46}

اگر از نمونه canonical tf.data.Dataset استفاده نمی کنم، چگونه داده های خود را توزیع کنم؟

گاهی اوقات کاربران می توانند یک استفاده نمی tf.data.Dataset برای نشان ورودی خود و پس از آن رابط های برنامه کاربردی در بالا ذکر شد برای توزیع مجموعه داده به دستگاه های متعدد. در چنین مواردی می توانید از تانسورهای خام یا ورودی های یک ژنراتور استفاده کنید.

برای ورودی‌های تانسور دلخواه، از تابع Experimental_distribute_values_from_استفاده کنید

strategy.run پذیرد tf.distribute.DistributedValues است که خروجی next(iterator) . به تصویب ارزش تانسور، استفاده از experimental_distribute_values_from_function به ساختار tf.distribute.DistributedValues از تانسورها خام.

mirrored_strategy = tf.distribute.MirroredStrategy()
worker_devices = mirrored_strategy.extended.worker_devices

def value_fn(ctx):
  return tf.constant(1.0)

distributed_values = mirrored_strategy.experimental_distribute_values_from_function(value_fn)
for _ in range(4):
  result = mirrored_strategy.run(lambda x:x, args=(distributed_values,))
  print(result)
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)

اگر ورودی شما از یک ژنراتور است، از tf.data.Dataset.from_generator استفاده کنید

اگر شما یک تابع مولد را که می خواهید استفاده کنید، شما می توانید یک tf.data.Dataset عنوان مثال با استفاده از from_generator API.

mirrored_strategy = tf.distribute.MirroredStrategy()
def input_gen():
  while True:
    yield np.random.rand(4)

# use Dataset.from_generator
dataset = tf.data.Dataset.from_generator(
    input_gen, output_types=(tf.float32), output_shapes=tf.TensorShape([4]))
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
iterator = iter(dist_dataset)
for _ in range(4):
  mirrored_strategy.run(lambda x:x, args=(next(iterator),))
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)