ورودی توزیع شده

مشاهده در TensorFlow.org در Google Colab اجرا شود مشاهده منبع در GitHub دانلود دفترچه یادداشت

API های tf.distribute راه آسانی را برای کاربران فراهم می کند تا آموزش خود را از یک ماشین به چندین ماشین مقیاس کنند. هنگام مقیاس بندی مدل خود، کاربران همچنین باید ورودی خود را در چندین دستگاه توزیع کنند. tf.distribute API هایی را ارائه می دهد که با استفاده از آنها می توانید ورودی خود را به طور خودکار در بین دستگاه ها توزیع کنید.

این راهنما راه‌های مختلفی را به شما نشان می‌دهد که از طریق آن می‌توانید مجموعه داده‌ها و تکرارکننده‌های توزیع شده را با استفاده از tf.distribute API ایجاد کنید. علاوه بر این، موضوعات زیر پوشش داده خواهد شد:

این راهنما استفاده از ورودی توزیع شده با Keras API را پوشش نمی دهد.

مجموعه داده های توزیع شده

برای استفاده از tf.distribute APIها در مقیاس، توصیه می شود که کاربران از tf.data.Dataset برای نمایش ورودی خود استفاده کنند. tf.distribute به گونه ای ساخته شده است که با tf.data.Dataset (به عنوان مثال، واکشی اولیه خودکار داده ها روی هر دستگاه شتاب دهنده) با بهینه سازی عملکرد به طور منظم در پیاده سازی کار کند. اگر مورد استفاده برای استفاده از چیزی غیر از tf.data.Dataset دارید، لطفاً به بخش بعدی در این راهنما مراجعه کنید. در یک حلقه آموزشی غیر توزیع شده، کاربران ابتدا یک نمونه tf.data.Dataset ایجاد می کنند و سپس روی عناصر تکرار می کنند. مثلا:

import tensorflow as tf

# Helper libraries
import numpy as np
import os

print(tf.__version__)
2.8.0-rc1
global_batch_size = 16
# Create a tf.data.Dataset object.
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)

@tf.function
def train_step(inputs):
  features, labels = inputs
  return labels - 0.3 * features

# Iterate over the dataset using the for..in construct.
for inputs in dataset:
  print(train_step(inputs))
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)

برای اینکه کاربران بتوانند از استراتژی tf.distribute با حداقل تغییرات در کد موجود کاربر استفاده کنند، دو API معرفی شدند که یک نمونه tf.data.Dataset را توزیع کرده و یک شیء مجموعه داده توزیع شده را برمی گرداند. سپس یک کاربر می تواند روی این نمونه داده توزیع شده تکرار کند و مدل خود را مانند قبل آموزش دهد. اجازه دهید اکنون به دو API - tf.distribute.Strategy.experimental_distribute_dataset و tf.distribute.Strategy.distribute_datasets_from_function با جزئیات بیشتر نگاه کنیم:

tf.distribute.Strategy.experimental_distribute_dataset

استفاده

این API یک نمونه tf.data.Dataset را به عنوان ورودی می گیرد و یک نمونه tf.distribute.DistributedDataset را برمی گرداند. باید مجموعه داده ورودی را با مقداری برابر با اندازه دسته کلی دسته بندی کنید. این اندازه دسته کلی تعداد نمونه‌هایی است که می‌خواهید در همه دستگاه‌ها در یک مرحله پردازش کنید. می توانید روی این مجموعه داده توزیع شده به روش پایتونیک تکرار کنید یا با استفاده از iter یک تکرار کننده ایجاد کنید. شیء برگشتی یک نمونه tf.data.Dataset نیست و از هیچ API دیگری که به هیچ وجه مجموعه داده را تبدیل یا بازرسی می کند، پشتیبانی نمی کند. اگر راه‌های خاصی ندارید که بخواهید ورودی خود را روی کپی‌های مختلف تقسیم کنید، این API توصیه می‌شود.

global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
# Distribute input using the `experimental_distribute_dataset`.
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
# 1 global batch of data fed to the model in 1 step.
print(next(iter(dist_dataset)))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
(<tf.Tensor: shape=(16, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>, <tf.Tensor: shape=(16, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>)
2022-01-26 05:34:05.342660: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2"
op: "TensorDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 1
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\017TensorDataset:4"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 1
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
}

خواص

دسته بندی

tf.distribute نمونه ورودی tf.data.Dataset را با یک اندازه دسته جدید که برابر است با اندازه دسته کلی تقسیم بر تعداد کپی های همگام، مجدداً جمع می کند. تعداد کپی‌های همگام‌سازی شده برابر است با تعداد دستگاه‌هایی که در گرادیان شرکت می‌کنند و در طول آموزش کاهش می‌یابند. هنگامی که کاربر با تکرار کننده توزیع شده تماس next را می گیرد، اندازه دسته ای از داده ها به ازای هر ماکت بر روی هر ماکت برگردانده می شود. کاردینالیته مجموعه داده مجدداً چند برابری از تعداد تکرارها خواهد بود. در اینجا چند نمونه آورده شده است:

  • tf.data.Dataset.range(6).batch(4, drop_remainder=False)

    • بدون توزیع:
    • دسته 1: [0، 1، 2، 3]
    • دسته 2: [4، 5]
    • با توزیع بیش از 2 ماکت. آخرین دسته ([4، 5]) بین 2 کپی تقسیم می شود.

    • دسته 1:

      • Replica 1:[0, 1]
      • Replica 2:[2, 3]
    • دسته 2:

      • ماکت 2: [4]
      • ماکت 2: [5]
  • tf.data.Dataset.range(4).batch(4)

    • بدون توزیع:
    • دسته 1: [[0]، [1]، [2]، [3]]
    • با توزیع بیش از 5 کپی:
    • دسته 1:
      • ماکت 1: [0]
      • ماکت 2: [1]
      • ماکت 3: [2]
      • ماکت 4: [3]
      • ماکت 5: []
  • tf.data.Dataset.range(8).batch(4)

    • بدون توزیع:
    • دسته 1: [0، 1، 2، 3]
    • دسته 2: [4، 5، 6، 7]
    • با توزیع بیش از 3 کپی:
    • دسته 1:
      • ماکت 1: [0، 1]
      • ماکت 2: [2، 3]
      • ماکت 3: []
    • دسته 2:
      • ماکت 1: [4، 5]
      • ماکت 2: [6، 7]
      • ماکت 3: []

بازگردانی مجموعه داده دارای پیچیدگی فضایی است که به صورت خطی با تعداد کپی ها افزایش می یابد. این بدان معنی است که برای استفاده از آموزش چند کارگری، خط لوله ورودی می تواند با خطاهای OOM مواجه شود.

شاردینگ

tf.distribute همچنین مجموعه داده ورودی را در آموزش چند کارگری با MultiWorkerMirroredStrategy و TPUStrategy می کند. هر مجموعه داده بر روی دستگاه CPU کارگر ایجاد می شود. اشتراک گذاری خودکار یک مجموعه داده روی مجموعه ای از کارگران به این معنی است که به هر کارگر زیر مجموعه ای از کل مجموعه داده اختصاص داده می شود (اگر tf.data.experimental.AutoShardPolicy سمت راست تنظیم شده باشد). این برای اطمینان از این است که در هر مرحله، یک اندازه دسته ای جهانی از عناصر داده غیرهمپوشانی توسط هر کارگر پردازش می شود. Autosharding چند گزینه مختلف دارد که می توان آنها را با استفاده از tf.data.experimental.DistributeOptions مشخص کرد. توجه داشته باشید که در آموزش چند کارگری با ParameterServerStrategy ، هیچ autosharding وجود ندارد و اطلاعات بیشتر در مورد ایجاد مجموعه داده با این استراتژی را می‌توانید در آموزش استراتژی Parameter Server بیابید .

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
dataset = dataset.with_options(options)

سه گزینه مختلف وجود دارد که می توانید برای tf.data.experimental.AutoShardPolicy تنظیم کنید:

  • AUTO: این گزینه پیش فرض است که به این معنی است که سعی می شود توسط FILE به اشتراک گذاشته شود. اگر یک مجموعه داده مبتنی بر فایل شناسایی نشود، تلاش برای خرد کردن توسط FILE با شکست مواجه می‌شود. سپس tf.distribute به اشتراک گذاری توسط DATA برمی گردد. توجه داشته باشید که اگر مجموعه داده ورودی مبتنی بر فایل باشد اما تعداد فایل‌ها کمتر از تعداد کارگران باشد، یک InvalidArgumentError . اگر این اتفاق افتاد، صراحتاً خط‌مشی را روی AutoShardPolicy.DATA تنظیم کنید، یا منبع ورودی خود را به فایل‌های کوچک‌تر تقسیم کنید تا تعداد فایل‌ها از تعداد کارگران بیشتر باشد.
  • FILE: این گزینه ای است که می خواهید فایل های ورودی را روی همه کارگران خرد کنید. اگر تعداد فایل های ورودی بسیار بیشتر از تعداد کارگران است و داده های موجود در فایل ها به طور مساوی توزیع شده اند، باید از این گزینه استفاده کنید. نقطه ضعف این گزینه وجود کارگران بیکار است در صورتی که داده ها در فایل ها به طور مساوی توزیع نشده باشند. اگر تعداد فایل ها کمتر از تعداد کارگران باشد، یک InvalidArgumentError مطرح می شود. اگر این اتفاق افتاد، صراحتاً خط‌مشی را روی AutoShardPolicy.DATA تنظیم کنید. به عنوان مثال، اجازه دهید 2 فایل را روی 2 کارگر با 1 ماکت توزیع کنیم. فایل 1 شامل [0، 1، 2، 3، 4، 5] و فایل 2 حاوی [6، 7، 8، 9، 10، 11] است. اجازه دهید تعداد کل کپی‌ها به صورت همگام 2 و اندازه دسته جهانی 4 باشد.

    • کارگر 0:
    • دسته 1 = ماکت 1: [0، 1]
    • دسته 2 = ماکت 1: [2، 3]
    • دسته 3 = ماکت 1: [4]
    • دسته 4 = ماکت 1: [5]
    • کارگر 1:
    • دسته 1 = ماکت 2: [6، 7]
    • دسته 2 = ماکت 2: [8، 9]
    • دسته 3 = ماکت 2: [10]
    • دسته 4 = ماکت 2: [11]
  • DATA: این کار عناصر را در همه کارگران به صورت خودکار خرد می کند. هر یک از کارگران کل مجموعه داده را می خوانند و فقط قطعه اختصاص داده شده به آن را پردازش می کنند. تمام خرده های دیگر دور ریخته خواهند شد. این معمولاً در صورتی استفاده می‌شود که تعداد فایل‌های ورودی کمتر از تعداد کارگران باشد و بخواهید اشتراک‌گذاری بهتری از داده‌ها در همه کارگران داشته باشید. نکته منفی این است که کل مجموعه داده در هر کارگر خوانده می شود. به عنوان مثال، اجازه دهید 1 فایل را روی 2 کارگر توزیع کنیم. فایل 1 شامل [0، 1، 2، 3، 4، 5، 6، 7، 8، 9، 10، 11] است. تعداد کل کپی های همگام شده را 2 عدد بگذارید.

    • کارگر 0:
    • دسته 1 = ماکت 1: [0، 1]
    • دسته 2 = ماکت 1: [4، 5]
    • دسته 3 = ماکت 1: [8، 9]
    • کارگر 1:
    • دسته 1 = ماکت 2: [2، 3]
    • دسته 2 = ماکت 2: [6، 7]
    • دسته 3 = ماکت 2: [10، 11]
  • OFF: اگر Autosharding را خاموش کنید، هر کارگر تمام داده ها را پردازش می کند. به عنوان مثال، اجازه دهید 1 فایل را روی 2 کارگر توزیع کنیم. فایل 1 شامل [0، 1، 2، 3، 4، 5، 6، 7، 8، 9، 10، 11] است. بگذارید تعداد کل کپی‌های همگام‌سازی شده 2 باشد. سپس هر کارگر توزیع زیر را می‌بیند:

    • کارگر 0:
    • دسته 1 = ماکت 1: [0، 1]
    • دسته 2 = ماکت 1: [2، 3]
    • دسته 3 = ماکت 1: [4، 5]
    • دسته 4 = ماکت 1: [6، 7]
    • دسته 5 = ماکت 1: [8، 9]
    • دسته 6 = ماکت 1: [10، 11]

    • کارگر 1:

    • دسته 1 = ماکت 2: [0، 1]

    • دسته 2 = ماکت 2: [2، 3]

    • دسته 3 = ماکت 2: [4، 5]

    • دسته 4 = ماکت 2: [6، 7]

    • دسته 5 = ماکت 2: [8، 9]

    • دسته 6 = ماکت 2: [10، 11]

پیش واکشی

به‌طور پیش‌فرض، tf.distribute یک تبدیل واکشی پیش‌فرض در انتهای نمونه tf.data.Dataset ارائه‌شده توسط کاربر اضافه می‌کند. آرگومان تبدیل پیش واکشی که buffer_size است برابر با تعداد تکرارهای همگام است.

tf.distribute.Strategy.distribute_datasets_from_function

استفاده

این API یک تابع ورودی می گیرد و یک نمونه tf.distribute.DistributedDataset را برمی گرداند. تابع ورودی که کاربران در آن ارسال می کنند دارای آرگومان tf.distribute.InputContext است و باید یک نمونه tf.data.Dataset را برگرداند. با این API، tf.distribute هیچ تغییر دیگری در نمونه tf.data.Dataset کاربر که از تابع ورودی برگردانده شده است، ایجاد نمی کند. مسئولیت دسته بندی و خرد کردن مجموعه داده ها بر عهده کاربر است. tf.distribute تابع ورودی را در دستگاه CPU هر یک از کارگران فراخوانی می کند. جدا از اینکه به کاربران اجازه می‌دهد منطق دسته‌بندی و اشتراک‌گذاری خود را مشخص کنند، این API مقیاس‌پذیری و عملکرد بهتری را در مقایسه با tf.distribute.Strategy.experimental_distribute_dataset که برای آموزش چند کارگری استفاده می‌شود.

mirrored_strategy = tf.distribute.MirroredStrategy()

def dataset_fn(input_context):
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)
  dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
  dataset = dataset.shard(
    input_context.num_input_pipelines, input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2) # This prefetches 2 batches per device.
  return dataset

dist_dataset = mirrored_strategy.distribute_datasets_from_function(dataset_fn)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

خواص

دسته بندی

نمونه tf.data.Dataset که مقدار بازگشتی تابع ورودی است، باید با استفاده از اندازه دسته‌ای هر تکرار دسته‌بندی شود. اندازه دسته‌ای هر ماکت، اندازه دسته جهانی تقسیم بر تعداد کپی‌هایی است که در آموزش همگام‌سازی شرکت می‌کنند. این به این دلیل است که tf.distribute تابع ورودی را در دستگاه CPU هر یک از کارگران فراخوانی می کند. مجموعه داده ای که روی یک کارگر معین ایجاد می شود باید برای استفاده توسط همه کپی های آن کارگر آماده باشد.

شاردینگ

شی tf.distribute.InputContext که به طور ضمنی به عنوان آرگومان به تابع ورودی کاربر ارسال می شود توسط tf.distribute در زیر هود ایجاد می شود. این تابع اطلاعاتی در مورد تعداد کارگران، شناسه کارگر فعلی و غیره دارد. این تابع ورودی می‌تواند طبق خط‌مشی‌هایی که کاربر با استفاده از این ویژگی‌ها که بخشی از شی tf.distribute.InputContext هستند، تقسیم‌بندی را انجام دهد.

پیش واکشی

tf.distribute یک تبدیل پیش واکشی در انتهای tf.data.Dataset که توسط تابع ورودی ارائه شده توسط کاربر برگردانده شده است، اضافه نمی کند.

تکرار کننده های توزیع شده

مشابه نمونه‌های غیر توزیع‌شده tf.data.Dataset ، باید یک تکرارکننده در نمونه‌های tf.distribute.DistributedDataset ایجاد کنید تا روی آن تکرار شود و به عناصر موجود در tf.distribute.DistributedDataset دسترسی داشته باشید. در زیر روش هایی وجود دارد که می توانید یک tf.distribute.DistributedIterator ایجاد کنید و از آن برای آموزش مدل خود استفاده کنید:

موارد استفاده

از ساختار حلقه پایتونیک برای حلقه استفاده کنید

می توانید از یک حلقه پایتونیک کاربر پسند برای تکرار روی tf.distribute.DistributedDataset استفاده کنید. عناصر برگردانده شده از tf.distribute.DistributedIterator می توانند یک tf.Tensor یا یک tf.distribute.DistributedValues که حاوی یک مقدار برای هر ماکت است. قرار دادن حلقه در داخل یک tf.function باعث افزایش عملکرد می شود. با این حال، break و return در حال حاضر برای یک حلقه روی tf.distribute.DistributedDataset که در داخل یک tf.function قرار می گیرد، پشتیبانی نمی شود.

global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

@tf.function
def train_step(inputs):
  features, labels = inputs
  return labels - 0.3 * features

for x in dist_dataset:
  # train_step trains the model using the dataset elements
  loss = mirrored_strategy.run(train_step, args=(x,))
  print("Loss is ", loss)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
2022-01-26 05:34:05.431113: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2"
op: "TensorDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 1
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\020TensorDataset:29"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 1
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
}
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)

از iter برای ایجاد یک تکرار کننده صریح استفاده کنید

برای تکرار روی عناصر در یک نمونه tf.distribute.DistributedDataset ، می توانید یک tf.distribute.DistributedIterator با استفاده از iter API روی آن ایجاد کنید. با یک تکرار کننده صریح، می توانید برای تعداد ثابتی از مراحل را تکرار کنید. برای دریافت عنصر بعدی از یک dist_iterator tf.distribute.DistributedIterator می توانید next(dist_iterator) ، dist_iterator.get_next() یا dist_iterator.get_next_as_optional() کنید. دو مورد قبلی در اصل یکسان هستند:

num_epochs = 10
steps_per_epoch = 5
for epoch in range(num_epochs):
  dist_iterator = iter(dist_dataset)
  for step in range(steps_per_epoch):
    # train_step trains the model using the dataset elements
    loss = mirrored_strategy.run(train_step, args=(next(dist_iterator),))
    # which is the same as
    # loss = mirrored_strategy.run(train_step, args=(dist_iterator.get_next(),))
    print("Loss is ", loss)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)

با next() یا tf.distribute.DistributedIterator.get_next() ، اگر tf.distribute.DistributedIterator به پایان خود رسیده باشد، یک خطای OutOfRange پرتاب می شود. مشتری می تواند خطا را در سمت پایتون تشخیص دهد و به انجام کارهای دیگر مانند چک پوینت و ارزیابی ادامه دهد. با این حال، اگر از یک حلقه آموزشی میزبان استفاده می کنید (یعنی چندین مرحله را در هر tf.function )، که به نظر می رسد، این کار کار نخواهد کرد:

@tf.function
def train_fn(iterator):
  for _ in tf.range(steps_per_loop):
    strategy.run(step_fn, args=(next(iterator),))

train_fn شامل چندین مرحله با پیچاندن بدنه استپ در یک tf.range است. در این حالت، تکرارهای مختلف در حلقه بدون وابستگی می‌توانند به صورت موازی شروع شوند، بنابراین یک خطای OutOfRange می‌تواند در تکرارهای بعدی قبل از اتمام محاسبه تکرارهای قبلی ایجاد شود. هنگامی که یک خطای OutOfRange پرتاب می شود، تمام عملیات در تابع بلافاصله خاتمه می یابد. اگر این موردی است که می‌خواهید از آن اجتناب کنید، جایگزینی که خطای OutOfRange ایجاد نمی‌کند، tf.distribute.DistributedIterator.get_next_as_optional() است. get_next_as_optional یک tf.experimental.Optional را برمی‌گرداند که اگر tf.distribute.DistributedIterator به پایان رسیده باشد، عنصر بعدی یا هیچ مقداری را ندارد.

# You can break the loop with get_next_as_optional by checking if the Optional contains value
global_batch_size = 4
steps_per_loop = 5
strategy = tf.distribute.MirroredStrategy(devices=["GPU:0", "CPU:0"])

dataset = tf.data.Dataset.range(9).batch(global_batch_size)
distributed_iterator = iter(strategy.experimental_distribute_dataset(dataset))

@tf.function
def train_fn(distributed_iterator):
  for _ in tf.range(steps_per_loop):
    optional_data = distributed_iterator.get_next_as_optional()
    if not optional_data.has_value():
      break
    per_replica_results = strategy.run(lambda x:x, args=(optional_data.get_value(),))
    tf.print(strategy.experimental_local_results(per_replica_results))
train_fn(distributed_iterator)
WARNING:tensorflow:There are non-GPU devices in `tf.distribute.Strategy`, not using nccl allreduce.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:CPU:0')
2022-01-26 05:34:07.300202: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "RangeDataset/_3"
op: "RangeDataset"
input: "Const/_0"
input: "Const/_1"
input: "Const/_2"
attr {
  key: "_cardinality"
  value {
    i: 9
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\020RangeDataset:104"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
      }
    }
  }
}
attr {
  key: "output_types"
  value {
    list {
      type: DT_INT64
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}

2022-01-26 05:34:07.355301: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
([0 1], [2 3])
([4 5], [6 7])
([8], [])

با استفاده از ویژگی element_spec

اگر عناصر یک مجموعه داده توزیع شده را به یک tf.function و یک ضمانت tf.TypeSpec می خواهید، می توانید آرگومان input_signature tf.function مشخص کنید. خروجی یک مجموعه داده توزیع شده tf.distribute.DistributedValues است که می تواند ورودی یک دستگاه یا چندین دستگاه را نشان دهد. برای بدست آوردن tf.TypeSpec مربوط به این مقدار توزیع شده، می توانید از ویژگی element_spec مجموعه داده توزیع شده یا شی تکرار کننده توزیع شده استفاده کنید.

global_batch_size = 16
epochs = 5
steps_per_epoch = 5
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

@tf.function(input_signature=[dist_dataset.element_spec])
def train_step(per_replica_inputs):
  def step_fn(inputs):
    return 2 * inputs

  return mirrored_strategy.run(step_fn, args=(per_replica_inputs,))

for _ in range(epochs):
  iterator = iter(dist_dataset)
  for _ in range(steps_per_epoch):
    output = train_step(next(iterator))
    tf.print(output)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
2022-01-26 05:34:07.611498: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2"
op: "TensorDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 1
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\021TensorDataset:122"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 1
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
}
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])

دسته های جزئی

هنگامی که نمونه‌های tf.data.Dataset که کاربران ایجاد می‌کنند ممکن است شامل اندازه‌های دسته‌ای باشد که به طور مساوی بر تعداد کپی‌ها تقسیم نمی‌شوند یا زمانی که اصلی بودن نمونه مجموعه داده بر اندازه دسته‌ای تقسیم‌پذیر نباشد، با دسته‌های جزئی مواجه می‌شویم. این بدان معناست که وقتی مجموعه داده بر روی چندین نسخه توزیع می‌شود، فراخوانی next در برخی تکرارکننده‌ها منجر به خطای OutOfRange می‌شود. برای رسیدگی به این مورد استفاده، tf.distribute دسته‌های ساختگی با اندازه دسته‌ای 0 را روی کپی‌هایی که داده دیگری برای پردازش ندارند، برمی‌گرداند.

برای مورد تک کارگر، اگر داده‌ها توسط تماس next در تکرارکننده بازگردانده نشود، دسته‌های ساختگی با اندازه دسته‌ای 0 ایجاد می‌شوند و همراه با داده‌های واقعی در مجموعه داده استفاده می‌شوند. در مورد دسته‌های جزئی، آخرین دسته جهانی داده‌ها شامل داده‌های واقعی در کنار دسته‌های ساختگی داده‌ها خواهد بود. شرایط توقف برای پردازش داده ها اکنون بررسی می کند که آیا هر یک از ماکت ها داده ای دارند یا خیر. اگر هیچ داده ای در مورد هیچ یک از کپی ها وجود نداشته باشد، یک خطای OutOfRange پرتاب می شود.

برای مورد چند کارگری، مقدار بولی که نشان‌دهنده حضور داده‌ها در هر یک از کارگران است با استفاده از ارتباط متقابل تکراری جمع‌آوری می‌شود و برای شناسایی اینکه آیا همه کارگران پردازش مجموعه داده توزیع‌شده را به پایان رسانده‌اند، استفاده می‌شود. از آنجایی که این شامل ارتباط متقابل کارگری است، جریمه عملکردی نیز در بر دارد.

هشدارها

  • هنگام استفاده از APIهای tf.distribute.Strategy.experimental_distribute_dataset با راه‌اندازی چند کارگر، کاربران یک tf.data.Dataset را ارسال می‌کنند که از فایل‌ها می‌خواند. اگر tf.data.experimental.AutoShardPolicy روی AUTO یا FILE تنظیم شده باشد، اندازه دسته واقعی در هر مرحله ممکن است کوچکتر از اندازه دسته کلی تعریف شده توسط کاربر باشد. این می تواند زمانی اتفاق بیفتد که عناصر باقی مانده در فایل کمتر از اندازه دسته ای جهانی باشند. کاربران می‌توانند مجموعه داده را بدون بسته به تعداد مراحل اجرا، خسته کنند یا tf.data.experimental.AutoShardPolicy را روی DATA تنظیم کنند تا دور آن کار کنند.

  • تبدیل‌های مجموعه داده حالتی در حال حاضر با tf.distribute پشتیبانی نمی‌شوند و هرگونه عملیات حالتی که مجموعه داده ممکن است داشته باشد در حال حاضر نادیده گرفته می‌شود. به عنوان مثال، اگر مجموعه داده شما دارای یک map_fn است که از tf.random.uniform برای چرخاندن یک تصویر استفاده می کند، آنگاه یک نمودار مجموعه داده دارید که به حالت (یعنی دانه تصادفی) در ماشین محلی که در آن فرآیند پایتون در حال اجرا است بستگی دارد.

  • Experimental tf.data.experimental.OptimizationOptions هایی که به طور پیش فرض غیرفعال می شوند می توانند در زمینه های خاصی -- مانند زمانی که همراه با tf.distribute استفاده می شوند -- باعث کاهش عملکرد شوند. شما باید آنها را فقط پس از تأیید اعتبار آنها فعال کنید که عملکرد حجم کاری شما در یک تنظیم توزیع مفید است.

  • لطفاً برای نحوه بهینه سازی خط لوله ورودی خود با tf.data به طور کلی به این راهنما مراجعه کنید. چند نکته اضافی:

    • اگر چندین کارگر دارید و از tf.data.Dataset.list_files برای ایجاد یک مجموعه داده از همه فایل‌های منطبق با یک یا چند الگوی glob استفاده می‌کنید، به یاد داشته باشید که آرگومان seed را تنظیم کنید یا shuffle=False را طوری تنظیم کنید که هر کارگر فایل را به طور پیوسته خرد کند.

    • اگر خط لوله ورودی شما شامل مخلوط کردن داده ها در سطح رکورد و تجزیه داده ها می شود، مگر اینکه داده های تجزیه نشده به طور قابل توجهی بزرگتر از داده های تجزیه شده باشد (که معمولاً اینطور نیست)، ابتدا مخلوط کنید و سپس تجزیه کنید، همانطور که در مثال زیر نشان داده شده است. این ممکن است برای استفاده و عملکرد حافظه مفید باشد.

d = tf.data.Dataset.list_files(pattern, shuffle=False)
d = d.shard(num_workers, worker_index)
d = d.repeat(num_epochs)
d = d.shuffle(shuffle_buffer_size)
d = d.interleave(tf.data.TFRecordDataset,
                 cycle_length=num_readers, block_length=1)
d = d.map(parser_fn, num_parallel_calls=num_map_threads)
  • tf.data.Dataset.shuffle(buffer_size, seed=None, reshuffle_each_iteration=None) یک بافر داخلی از عناصر buffer_size را حفظ می کند، و بنابراین کاهش buffer_size می تواند مشکل OOM را کاهش دهد.

  • ترتیب پردازش داده ها توسط کارگران هنگام استفاده از tf.distribute.experimental_distribute_dataset یا tf.distribute.distribute_datasets_from_function تضمین نمی شود. اگر از tf.distribute برای پیش‌بینی مقیاس استفاده می‌کنید، معمولاً این مورد ضروری است. با این حال، می توانید برای هر عنصر در دسته یک شاخص درج کنید و خروجی ها را بر اساس آن سفارش دهید. قطعه زیر نمونه ای از نحوه سفارش خروجی ها است.

mirrored_strategy = tf.distribute.MirroredStrategy()
dataset_size = 24
batch_size = 6
dataset = tf.data.Dataset.range(dataset_size).enumerate().batch(batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

def predict(index, inputs):
  outputs = 2 * inputs
  return index, outputs

result = {}
for index, inputs in dist_dataset:
  output_index, outputs = mirrored_strategy.run(predict, args=(index, inputs))
  indices = list(mirrored_strategy.experimental_local_results(output_index))
  rindices = []
  for a in indices:
    rindices.extend(a.numpy())
  outputs = list(mirrored_strategy.experimental_local_results(outputs))
  routputs = []
  for a in outputs:
    routputs.extend(a.numpy())
  for i, value in zip(rindices, routputs):
    result[i] = value

print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
{0: 0, 1: 2, 2: 4, 3: 6, 4: 8, 5: 10, 6: 12, 7: 14, 8: 16, 9: 18, 10: 20, 11: 22, 12: 24, 13: 26, 14: 28, 15: 30, 16: 32, 17: 34, 18: 36, 19: 38, 20: 40, 21: 42, 22: 44, 23: 46}
2022-01-26 05:34:08.978884: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "RangeDataset/_3"
op: "RangeDataset"
input: "Const/_4"
input: "Const/_1"
input: "Const/_2"
attr {
  key: "_cardinality"
  value {
    i: 9223372036854775807
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\020RangeDataset:162"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
      }
    }
  }
}
attr {
  key: "output_types"
  value {
    list {
      type: DT_INT64
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}

اگر از نمونه canonical tf.data.Dataset استفاده نمی کنم، چگونه داده های خود را توزیع کنم؟

گاهی اوقات کاربران نمی توانند از tf.data.Dataset برای نمایش ورودی خود و متعاقباً از APIهای ذکر شده در بالا برای توزیع مجموعه داده در چندین دستگاه استفاده کنند. در چنین مواردی می توانید از تانسورهای خام یا ورودی های یک ژنراتور استفاده کنید.

برای ورودی های تانسور دلخواه، از functional_distribute_values_from_function استفاده کنید

strategy.run tf.distribute.DistributedValues را می پذیرد که خروجی next(iterator) است. برای ارسال مقادیر تانسور، از experimental_distribute_values_from_function برای ساخت tf.distribute.DistributedValues از تانسورهای خام استفاده کنید.

mirrored_strategy = tf.distribute.MirroredStrategy()
worker_devices = mirrored_strategy.extended.worker_devices

def value_fn(ctx):
  return tf.constant(1.0)

distributed_values = mirrored_strategy.experimental_distribute_values_from_function(value_fn)
for _ in range(4):
  result = mirrored_strategy.run(lambda x:x, args=(distributed_values,))
  print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)

اگر ورودی شما از یک ژنراتور است، از tf.data.Dataset.from_generator استفاده کنید

اگر یک تابع مولد دارید که می خواهید استفاده کنید، می توانید یک نمونه tf.data.Dataset با استفاده از from_generator API ایجاد کنید.

mirrored_strategy = tf.distribute.MirroredStrategy()
def input_gen():
  while True:
    yield np.random.rand(4)

# use Dataset.from_generator
dataset = tf.data.Dataset.from_generator(
    input_gen, output_types=(tf.float32), output_shapes=tf.TensorShape([4]))
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
iterator = iter(dist_dataset)
for _ in range(4):
  mirrored_strategy.run(lambda x:x, args=(next(iterator),))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
2022-01-26 05:34:09.091386: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Did not find a shardable source, walked to a node which is not a dataset: name: "FlatMapDataset/_2"
op: "FlatMapDataset"
input: "TensorDataset/_1"
attr {
  key: "Targuments"
  value {
    list {
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: -2
  }
}
attr {
  key: "f"
  value {
    func {
      name: "__inference_Dataset_flat_map_flat_map_fn_3980"
    }
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\022FlatMapDataset:178"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 4
        }
      }
    }
  }
}
attr {
  key: "output_types"
  value {
    list {
      type: DT_FLOAT
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
}
. Consider either turning off auto-sharding or switching the auto_shard_policy to DATA to shard this dataset. You can do this by creating a new `tf.data.Options()` object then setting `options.experimental_distribute.auto_shard_policy = AutoShardPolicy.DATA` before applying the options object to the dataset via `dataset.with_options(options)`.