ترجمت واجهة Cloud Translation API‏ هذه الصفحة.
Switch to English

المدخلات الموزعة

عرض على TensorFlow.org تشغيل في Google Colab عرض المصدر على جيثب تنزيل دفتر الملاحظات

توفر واجهات برمجة التطبيقات tf.distribute طريقة سهلة للمستخدمين لتوسيع نطاق تدريبهم من جهاز واحد إلى أجهزة متعددة. عند قياس نموذجهم ، يتعين على المستخدمين أيضًا توزيع مدخلاتهم عبر أجهزة متعددة. يوفر tf.distribute واجهات برمجة التطبيقات التي يمكنك من خلالها توزيع tf.distribute تلقائيًا عبر الأجهزة.

سيوضح لك هذا الدليل الطرق المختلفة التي يمكنك من خلالها إنشاء مجموعة بيانات tf.distribute باستخدام tf.distribute APIs. بالإضافة إلى ذلك ، سيتم تغطية الموضوعات التالية:

لا يغطي هذا الدليل استخدام المدخلات الموزعة مع Keras APIs.

مجموعات البيانات الموزعة

لاستخدام tf.distribute APIs tf.distribute ، يوصى بأن يستخدم المستخدمون tf.data.Dataset لتمثيل مدخلاتهم. تم إجراء tf.distribute للعمل بكفاءة مع tf.data.Dataset (على سبيل المثال ، الجلب المسبق التلقائي للبيانات على كل جهاز تسريع) مع تحسينات الأداء التي يتم دمجها بانتظام في التنفيذ. إذا كانت لديك حالة استخدام لاستخدام شيء آخر غير tf.data.Dataset ، فيرجى الرجوع إلى قسم لاحق في هذا الدليل. في حلقة التدريب غير الموزعة ، يقوم المستخدمون أولاً بإنشاء مثيل tf.data.Dataset ثم التكرار على العناصر. فمثلا:

 import tensorflow as tf

# Helper libraries
import numpy as np
import os

print(tf.__version__)
 
2.3.0

 global_batch_size = 16
# Create a tf.data.Dataset object.
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)

@tf.function
def train_step(inputs):
  features, labels = inputs
  return labels - 0.3 * features

# Iterate over the dataset using the for..in construct.
for inputs in dataset:
  print(train_step(inputs))

 
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)

للسماح للمستخدمين باستخدام إستراتيجية tf.distribute مع الحد الأدنى من التغييرات على التعليمات البرمجية الحالية للمستخدم ، تم تقديم اثنين من واجهات برمجة التطبيقات التي من شأنها توزيع مثيل tf.data.Dataset وإرجاع كائن مجموعة بيانات موزع. يمكن للمستخدم بعد ذلك تكرار هذه النسخة من مجموعة البيانات الموزعة وتدريب نموذجهم كما كان من قبل. دعونا نلقي نظرة الآن على tf.distribute.Strategy.experimental_distribute_dataset API - tf.distribute.Strategy.experimental_distribute_dataset و tf.distribute.Strategy.experimental_distribute_datasets_from_function بمزيد من التفاصيل:

tf.distribute.Strategy.experimental_distribute_dataset

إستعمال

تأخذ واجهة برمجة التطبيقات هذه مثيل tf.data.Dataset كإدخال وترجع مثيل tf.distribute.DistributedDataset . يجب عليك تجميع مجموعة بيانات الإدخال بقيمة تساوي حجم الدُفعة العالمي. حجم الدُفعة العالمي هذا هو عدد العينات التي تريد معالجتها عبر جميع الأجهزة في خطوة واحدة. يمكنك تكرار مجموعة البيانات الموزعة هذه بطريقة Pythonic أو إنشاء مكرر باستخدام iter . الكائن الذي تم إرجاعه ليس نسخة tf.data.Dataset ولا يدعم أي واجهات برمجة تطبيقات أخرى تقوم بتحويل أو فحص مجموعة البيانات بأي شكل من الأشكال. هذه هي واجهة برمجة التطبيقات الموصى بها إذا لم يكن لديك طرق محددة تريد من خلالها مشاركة إدخالاتك عبر نسخ متماثلة مختلفة.

 global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
# Distribute input using the `experimental_distribute_dataset`.
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
# 1 global batch of data fed to the model in 1 step.
print(next(iter(dist_dataset)))
 
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/data/ops/multi_device_iterator_ops.py:601: get_next_as_optional (from tensorflow.python.data.ops.iterator_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.data.Iterator.get_next_as_optional()` instead.
(<tf.Tensor: shape=(16, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>, <tf.Tensor: shape=(16, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>)

الخصائص

الخلط

tf.distribute tf.distribute بإعادة tf.data.Dataset مثيل tf.data.Dataset للإدخال بحجم دُفعة جديد يساوي حجم الدُفعة العام مقسومًا على عدد النسخ المتماثلة المتزامنة. عدد النسخ المتماثلة المتزامنة يساوي عدد الأجهزة التي تشارك في تقليل التدرج اللوني أثناء التدريب. عندما يقوم المستخدم بالاتصال next على المكرر الموزع ، يتم إرجاع حجم مجموعة البيانات لكل نسخة متماثلة على كل نسخة متماثلة. ستكون مجموعة بيانات مجموعة البيانات التي تم إعادة تلقيمها دائمًا مضاعفة لعدد النسخ المتماثلة. هنا بضعة أمثلة:

  • tf.data.Dataset.range(6).batch(4, drop_remainder=False)

    بدون توزيع:

    الدفعة 1: [0 ، 1 ، 2 ، 3]

    الدفعة 2: [4 ، 5]

    مع التوزيع على نسختين متماثلتين:

    الدفعة 1: النسخة المتماثلة 1: [0 ، 1] النسخة المتماثلة 2: [2 ، 3]

    الدفعة 2: النسخة المتماثلة 2: [4] النسخة المتماثلة 2: [5]

    يتم تقسيم الدفعة الأخيرة ([4 ، 5]) بين نسختين متماثلتين.

  • tf.data.Dataset.range(4).batch(4)

    بدون توزيع:

    الدفعة 1: [[0] ، [1] ، [2] ، [3]]

    مع التوزيع على 5 نسخ طبق الأصل:

    الدفعة 1: نسخة متماثلة 1: [0] نسخة متماثلة 2: [1] نسخة طبق الأصل 3: [2] نسخة طبق الأصل 4: [3] نسخة طبق الأصل 5: []

  • tf.data.Dataset.range(8).batch(4)

    بدون توزيع:

    الدفعة 1: [0 ، 1 ، 2 ، 3]

    الدفعة 2: [4 ، 5 ، 6 ، 7]

    مع التوزيع على 3 نسخ طبق الأصل:

    الدفعة 1: النسخة المتماثلة 1: [0 ، 1] النسخة المتماثلة 2: [2 ، 3] النسخة المتماثلة 3: []

    الدفعة 2: النسخة 1: [4 ، 5] النسخة 2: [6 ، 7] النسخة 3: []

تتضمن إعادة تجميع مجموعة البيانات تعقيدًا في الفراغ يزيد خطيًا مع عدد النسخ المتماثلة. هذا يعني أنه بالنسبة لحالة استخدام التدريب متعدد العمال ، يمكن أن يتعرض خط أنابيب الإدخال لأخطاء OOM.

تقاسم

tf.distribute أيضًا بتوزيع مجموعة بيانات الإدخال تلقائيًا في تدريب متعدد العمال. يتم إنشاء كل مجموعة بيانات على جهاز CPU الخاص بالعامل. يعني مشاركة مجموعة البيانات tf.data.experimental.AutoShardPolicy على مجموعة من العمال أن كل عامل يتم تعيينه لمجموعة فرعية من مجموعة البيانات بأكملها (إذا تم تعيين tf.data.experimental.AutoShardPolicy الصحيح). هذا لضمان أنه في كل خطوة ، سيتم معالجة حجم الدفعة العالمية لعناصر مجموعة البيانات غير المتداخلة بواسطة كل عامل. يحتوي tf.data.experimental.DistributeOptions خيارين مختلفين يمكن تحديدهما باستخدام tf.data.experimental.DistributeOptions .

 dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
dataset = dataset.with_options(options)
 

توجد ثلاثة خيارات مختلفة يمكنك تعيينها لـ tf.data.experimental.AutoShardPolicy :

  • AUTO (تلقائي): هذا هو الخيار الافتراضي مما يعني أنه سيتم إجراء محاولة لتقطيع الملف بواسطة FILE. فشلت محاولة التجزئة بواسطة FILE إذا لم يتم الكشف عن مجموعة بيانات قائمة على الملفات. tf.distribute وبعد ذلك تعود الى عملية التجزئة التي كتبها DATA. لاحظ أنه إذا كانت مجموعة بيانات الإدخال قائمة على الملفات ولكن عدد الملفات أقل من عدد العاملين ، فسيظهر خطأ.
  • ملف: هذا هو الخيار إذا كنت تريد تقسيم ملفات الإدخال على جميع العاملين. إذا كان عدد الملفات أقل من عدد العمال ، فسيظهر خطأ. يجب عليك استخدام هذا الخيار إذا كان عدد ملفات الإدخال أكبر بكثير من عدد العاملين وكانت البيانات الموجودة في الملفات موزعة بالتساوي. الجانب السلبي لهذا الخيار هو وجود عمال خامل إذا لم يتم توزيع البيانات الموجودة في الملفات بالتساوي. على سبيل المثال ، دعنا نوزع ملفين على عاملين مع نسخة متماثلة واحدة لكل منهما. يحتوي الملف 1 على [0 ، 1 ، 2 ، 3 ، 4 ، 5] ويحتوي الملف 2 على [6 ، 7 ، 8 ، 9 ، 10 ، 11]. اجعل العدد الإجمالي للنسخ المتماثلة المتزامنة 2 ويكون حجم الدُفعة العام 4.

    • العامل 0:

    الدفعة 1 = النسخة المتماثلة 1: [0 ، 1]

    الدفعة 2 = النسخة المتماثلة 1: [2 ، 3]

    الدفعة 3 = النسخة المتماثلة 1: [4]

    الدفعة 4 = النسخة المتماثلة 1: [5]

    • العامل 1:

    الدفعة 1 = النسخة 2: [6، 7]

    الدفعة 2 = النسخة 2: [8، 9]

    الدفعة 3 = النسخة المتماثلة 2: [10]

    الدفعة 4 = النسخة 2: [11]

  • البيانات: سيؤدي هذا تلقائيًا إلى تشغيل العناصر عبر جميع العمال. سيقرأ كل عامل مجموعة البيانات بالكامل ويعالج الجزء المخصص لها فقط. سيتم التخلص من جميع الأجزاء الأخرى. يستخدم هذا بشكل عام إذا كان عدد ملفات الإدخال أقل من عدد العاملين وتريد مشاركة أفضل للبيانات عبر جميع العمال. الجانب السلبي هو أن مجموعة البيانات بأكملها ستتم قراءتها على كل عامل. على سبيل المثال ، دعونا نوزع ملفًا واحدًا على عاملين. يحتوي الملف 1 على [0 ، 1 ، 2 ، 3 ، 4 ، 5 ، 6 ، 7 ، 8 ، 9 ، 10 ، 11]. دع العدد الإجمالي للنسخ المتماثلة المتزامنة يكون 2.

    • العامل 0:

    الدفعة 1 = النسخة المتماثلة 1: [0 ، 1]

    الدفعة 2 = النسخة 1: [4، 5]

    الدفعة 3 = النسخة المتماثلة 1: [8 ، 9]

    • العامل 1:

    الدفعة 1 = النسخة المتماثلة 2: [2 ، 3]

    الدفعة 2 = النسخة 2: [6، 7]

    الدفعة 3 = النسخة 2: [10، 11]

  • إيقاف التشغيل: إذا قمت بإيقاف تشغيل ميزة المشاركة التلقائية ، فسيقوم كل عامل بمعالجة جميع البيانات. على سبيل المثال ، دعونا نوزع ملفًا واحدًا على عاملين. يحتوي الملف 1 على [0 ، 1 ، 2 ، 3 ، 4 ، 5 ، 6 ، 7 ، 8 ، 9 ، 10 ، 11]. دع العدد الإجمالي للنسخ المتماثلة المتزامنة يكون 2. ثم سيرى كل عامل التوزيع التالي:

    • عامل 0:

    الدفعة 1 = النسخة المتماثلة 1: [0، 1]

    الدفعة 2 = النسخة المتماثلة 1: [2 ، 3]

    الدفعة 3 = النسخة المتماثلة 1: [4 ، 5]

    الدفعة 4 = النسخة 1: [6، 7]

    الدفعة 5 = النسخة 1: [8، 9]

    الدفعة 6 = النسخة المتماثلة 1: [10 ، 11]

    • العامل 1:

    الدفعة 1 = النسخة المتماثلة 2: [0، 1]

    الدفعة 2 = النسخة المتماثلة 2: [2 ، 3]

    الدفعة 3 = النسخة 2: [4، 5]

    الدفعة 4 = النسخة المتماثلة 2: [6 ، 7]

    الدفعة 5 = النسخة المتماثلة 2: [8 ، 9]

    الدفعة 6 = النسخة 2: [10، 11]

الجلب المسبق

بشكل افتراضي ، يضيف tf.distribute تحويل الجلب المسبق في نهاية المستخدم الذي قدمه tf.data.Dataset المثيل. وسيطة تحويل الجلب المسبق والتي تكون buffer_size تساوي عدد النسخ المتماثلة المتزامنة.

tf.distribute.Strategy.experimental_distribute_datasets_from_function

الاستخدام

تأخذ واجهة برمجة التطبيقات (API) هذه وظيفة إدخال وتقوم بإرجاع مثيل tf.distribute.DistributedDataset . دالة الإدخال التي يقوم المستخدمون tf.distribute.InputContext وسيطة tf.distribute.InputContext ويجب أن تُرجع مثيل tf.data.Dataset . باستخدام واجهة برمجة التطبيقات هذه ، لا tf.distribute أية تغييرات أخرى على مثيل tf.data.Dataset للمستخدم الذي tf.data.Dataset إرجاعه من دالة الإدخال. يتحمل المستخدم مسؤولية تجميع وتقطيع مجموعة البيانات. يستدعي tf.distribute وظيفة الإدخال على جهاز وحدة المعالجة المركزية لكل عامل. بصرف النظر عن السماح للمستخدمين بتحديد منطق التجميع والتجزئة الخاص بهم ، توضح واجهة برمجة التطبيقات هذه أيضًا قابلية تطوير وأداء أفضل مقارنةً بـ tf.distribute.Strategy.experimental_distribute_dataset عند استخدامها لتدريب العديد من العمال.

 mirrored_strategy = tf.distribute.MirroredStrategy()

def dataset_fn(input_context):
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)
  dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
  dataset = dataset.shard(
    input_context.num_input_pipelines, input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2) # This prefetches 2 batches per device.
  return dataset

dist_dataset = mirrored_strategy.experimental_distribute_datasets_from_function(dataset_fn)
 
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

الخصائص

الخلط

يجب tf.data.Dataset نسخة tf.data.Dataset التي تمثل القيمة المرجعة لوظيفة الإدخال باستخدام حجم كل نسخة متماثلة. حجم الدُفعة لكل نسخة متماثلة هو حجم الدُفعة العمومي مقسومًا على عدد النسخ المتماثلة التي تشارك في تدريب المزامنة. هذا لأن tf.distribute يستدعي وظيفة الإدخال على جهاز وحدة المعالجة المركزية لكل عامل. يجب أن تكون مجموعة البيانات التي تم إنشاؤها على عامل معين جاهزة للاستخدام من قبل جميع النسخ المتماثلة الموجودة على هذا العامل.

تقاسم

يتم tf.distribute.InputContext الكائن tf.distribute.InputContext الذي يتم تمريره بشكل ضمني كوسيطة لوظيفة الإدخال الخاصة بالمستخدم بواسطة tf.distribute تحت الغطاء. يحتوي على معلومات حول عدد العمال ومعرف العامل الحالي وما إلى ذلك. يمكن أن تتعامل وظيفة الإدخال هذه مع تقسيم البيانات وفقًا للسياسات التي وضعها المستخدم باستخدام هذه الخصائص التي تعد جزءًا من كائن tf.distribute.InputContext .

الجلب المسبق

لا يضيف tf.distribute تحويل الجلب المسبق في نهاية tf.data.Dataset التي أرجعتها وظيفة الإدخال المقدمة من المستخدم.

تكرارات موزعة

على غرار مثيلات tf.data.Dataset غير الموزعة ، ستحتاج إلى إنشاء مكرر على tf.distribute.DistributedDataset مثيلات tf.distribute.DistributedDataset والوصول إلى العناصر الموجودة في tf.distribute.DistributedDataset . فيما يلي الطرق التي يمكنك من خلالها إنشاء tf.distribute.DistributedIterator واستخدامه لتدريب نموذجك:

الأعراف

استخدم بنية Pythonic for loop

يمكنك استخدام حلقة Pythonic سهلة الاستخدام للتكرار عبر tf.distribute.DistributedDataset . العناصر التي تم إرجاعها من tf.distribute.DistributedIterator يمكن أن تكون tf.Tensor مفرد أو tf.distribute.DistributedValues التي تحتوي على قيمة لكل نسخة متماثلة. سيعطي وضع الحلقة داخل وظيفة tf.function تعزيزًا للأداء. ومع ذلك ، فإن break return غير مدعومين حاليًا إذا تم وضع الحلقة داخل وظيفة tf.function . لا ندعم أيضًا وضع الحلقة داخل tf.function عند استخدام استراتيجيات متعددة العاملين مثل tf.distribute.experimental.MultiWorkerMirroredStrategy و tf.distribute.TPUStrategy . يعمل وضع الحلقة داخل tf.function لعامل واحد tf.distribute.TPUStrategy ولكن ليس عند استخدام tf.distribute.TPUStrategy TPU.

 global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

@tf.function
def train_step(inputs):
  features, labels = inputs
  return labels - 0.3 * features

for x in dist_dataset:
  # train_step trains the model using the dataset elements
  loss = mirrored_strategy.run(train_step, args=(x,))
  print("Loss is ", loss)
 
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)

استخدم iter لإنشاء مكرر صريح

إلى كرر أكثر من العناصر في tf.distribute.DistributedDataset سبيل المثال، يمكنك إنشاء tf.distribute.DistributedIterator باستخدام iter API على ذلك. باستخدام مكرر صريح ، يمكنك التكرار لعدد ثابت من الخطوات. للحصول على العنصر التالي من tf.distribute.DistributedIterator مثيل dist_iterator ، يمكنك استدعاء next(dist_iterator) أو dist_iterator.get_next() أو dist_iterator.get_next_as_optional() . السابقان هما في الأساس نفس الشيء:

 num_epochs = 10
steps_per_epoch = 5
for epoch in range(num_epochs):
  dist_iterator = iter(dist_dataset)
  for step in range(steps_per_epoch):
    # train_step trains the model using the dataset elements
    loss = mirrored_strategy.run(train_step, args=(next(dist_iterator),))
    # which is the same as
    # loss = mirrored_strategy.run(train_step, args=(dist_iterator.get_next(),))
    print("Loss is ", loss)
 
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)

مع next() أو tf.distribute.DistributedIterator.get_next() ، إذا وصل tf.distribute.DistributedIterator إلى نهايته ، فسيتم طرح خطأ OutOfRange. يمكن للعميل التقاط الخطأ من جانب الثعبان ومواصلة القيام بأعمال أخرى مثل نقاط التفتيش والتقييم. ومع ذلك ، لن ينجح هذا إذا كنت تستخدم حلقة تدريب مضيف (على tf.function ، قم بتشغيل خطوات متعددة لكل tf.function ) ، والتي تبدو مثل:

 @tf.function
def train_fn(iterator):
  for _ in tf.range(steps_per_loop):
    strategy.run(step_fn, args=(next(iterator),))
 

يحتوي train_fn على خطوات متعددة عن طريق لف جسم الخطوة داخل نطاق tf.range . في هذه الحالة ، يمكن أن تبدأ التكرارات المختلفة في الحلقة بدون تبعية بالتوازي ، لذلك يمكن تشغيل خطأ OutOfRange في التكرارات اللاحقة قبل انتهاء حساب التكرارات السابقة. بمجرد إلقاء خطأ OutOfRange ، سيتم إنهاء جميع العمليات في الوظيفة على الفور. إذا كانت هذه هي الحالة التي ترغب في تجنبها ، فإن البديل الذي لا يلقي خطأ OutOfRange هو tf.distribute.DistributedIterator.get_next_as_optional() . get_next_as_optional بإرجاع tf.experimental.Optional الذي يحتوي على العنصر التالي أو لا يحتوي على قيمة إذا وصل tf.distribute.DistributedIterator إلى نهايته.

 # You can break the loop with get_next_as_optional by checking if the Optional contains value
global_batch_size = 4
steps_per_loop = 5
strategy = tf.distribute.MirroredStrategy(devices=["GPU:0", "CPU:0"])

dataset = tf.data.Dataset.range(9).batch(global_batch_size)
distributed_iterator = iter(strategy.experimental_distribute_dataset(dataset))

@tf.function
def train_fn(distributed_iterator):
  for _ in tf.range(steps_per_loop):
    optional_data = distributed_iterator.get_next_as_optional()
    if not optional_data.has_value():
      break
    per_replica_results = strategy.run(lambda x:x, args=(optional_data.get_value(),))
    tf.print(strategy.experimental_local_results(per_replica_results))
train_fn(distributed_iterator)
 
WARNING:tensorflow:There are non-GPU devices in `tf.distribute.Strategy`, not using nccl allreduce.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:CPU:0')
([0 1], [2 3])
([4 5], [6 7])
([8], [])

باستخدام خاصية element_spec

إذا قمت بتمرير عناصر مجموعة بيانات موزعة إلى tf.function وتريد ضمان tf.TypeSpec ، فيمكنك تحديد وسيطة input_signature الخاصة tf.function . ناتج مجموعة البيانات الموزعة هو tf.distribute.DistributedValues التي يمكن أن تمثل المدخلات إلى جهاز واحد أو أجهزة متعددة. للحصول على tf.TypeSpec المطابق لهذه القيمة الموزعة ، يمكنك استخدام خاصية element_spec لمجموعة البيانات الموزعة أو كائن element_spec الموزع.

 global_batch_size = 16
epochs = 5
steps_per_epoch = 5
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

@tf.function(input_signature=[dist_dataset.element_spec])
def train_step(per_replica_inputs):
  def step_fn(inputs):
    return 2 * inputs
  
  return mirrored_strategy.run(step_fn, args=(per_replica_inputs,))

for _ in range(epochs):
  iterator = iter(dist_dataset)
  for _ in range(steps_per_epoch):
    output = train_step(next(iterator))
    tf.print(output)
 
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])

دفعات جزئية

تتم مصادفة الدُفعات الجزئية عندما قد تحتوي مثيلات tf.data.Dataset التي tf.data.Dataset المستخدمون على أحجام دُفعات لا تقبل القسمة بالتساوي على عدد النسخ المتماثلة أو عندما لا يمكن القسمة على عدد النسخ المتماثلة من مجموعة البيانات على حجم الدُفعة. هذا يعني أنه عندما يتم توزيع مجموعة البيانات على عدة نسخ متماثلة ، فإن الاستدعاء next على بعض التكرارات سينتج عنه OutOfRangeError. للتعامل مع حالة الاستخدام هذه ، يقوم tf.distribute بإرجاع دفعات وهمية بحجم الدفعة 0 على النسخ المتماثلة التي ليس لديها المزيد من البيانات tf.distribute .

بالنسبة لحالة العامل المنفرد ، إذا لم يتم إرجاع البيانات من next المكالمة next على المكرر ، يتم إنشاء مجموعات وهمية بحجم 0 دفعة واستخدامها مع البيانات الحقيقية في مجموعة البيانات. في حالة الدُفعات الجزئية ، ستحتوي الدفعة العامة الأخيرة من البيانات على بيانات حقيقية إلى جانب الدُفعات الوهمية من البيانات. تتحقق حالة الإيقاف الخاصة بمعالجة البيانات الآن من وجود بيانات في أي من النسخ المتماثلة. إذا لم تكن هناك بيانات في أي من النسخ المتماثلة ، فسيتم طرح خطأ OutOfRange.

بالنسبة لحالة العمال المتعددين ، يتم تجميع القيمة المنطقية التي تمثل وجود البيانات على كل عامل باستخدام الاتصال عبر النسخ المتماثلة ويتم استخدامها لتحديد ما إذا كان جميع العمال قد انتهوا من معالجة مجموعة البيانات الموزعة. نظرًا لأن هذا يتضمن التواصل عبر العمال ، فهناك بعض عقوبات الأداء.

تحفظات

  • عند استخدام tf.distribute.Strategy.experimental_distribute_dataset APIs مع إعداد عامل متعدد ، يمرر المستخدمون مجموعة بيانات tf.data.Dataset التي تقرأ من الملفات. إذا تم تعيين tf.data.experimental.AutoShardPolicy على AUTO أو FILE ، فقد يكون حجم الدُفعة الفعلي لكل خطوة أصغر من حجم الدُفعة العام الذي يحدده المستخدم. يمكن أن يحدث هذا عندما تكون العناصر المتبقية في الملف أقل من حجم الدُفعة العام. يمكن للمستخدمين إما استنفاد مجموعة البيانات دون الاعتماد على عدد خطوات التشغيل أو تعيين tf.data.experimental.AutoShardPolicy على DATA للتغلب عليها.

  • تحويلات مجموعة البيانات ذات الحالة غير مدعومة حاليًا مع tf.distribute وأي عمليات ذات حالة قد تكون مجموعة البيانات قد تم تجاهلها حاليًا. على سبيل المثال ، إذا كانت مجموعة البيانات الخاصة بك تحتوي على map_fn يستخدم tf.random.uniform لتدوير صورة ، tf.random.uniform رسم بياني لمجموعة البيانات يعتمد على الحالة (أي البذور العشوائية) على الجهاز المحلي حيث يتم تنفيذ عملية Python.

  • tf.data.experimental.OptimizationOptions أن tf.data.experimental.OptimizationOptions التي تم تعطيلها افتراضيًا في سياقات معينة - مثل استخدامها مع tf.distribute - إلى تدهور الأداء. يجب عليك تمكينها فقط بعد التحقق من أنها تستفيد من أداء عبء العمل الخاص بك في إعداد التوزيع.

  • الترتيب الذي تتم معالجة البيانات به بواسطة العمال عند استخدام tf.distribute.experimental_distribute_dataset أو tf.distribute.experimental_distribute_datasets_from_function غير مضمون. هذا مطلوب عادةً إذا كنت تستخدم tf.distribute التنبؤ. ومع ذلك ، يمكنك إدراج فهرس لكل عنصر في مخرجات الدفعة والطلب وفقًا لذلك. المقتطف التالي هو مثال لكيفية طلب المخرجات.

 mirrored_strategy = tf.distribute.MirroredStrategy()
dataset_size = 24
batch_size = 6
dataset = tf.data.Dataset.range(dataset_size).enumerate().batch(batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

def predict(index, inputs):
  outputs = 2 * inputs
  return index, outputs

result = {}
for index, inputs in dist_dataset:
  output_index, outputs = mirrored_strategy.run(predict, args=(index, inputs))
  indices = list(mirrored_strategy.experimental_local_results(output_index))
  rindices = []
  for a in indices:
    rindices.extend(a.numpy())
  outputs = list(mirrored_strategy.experimental_local_results(outputs))
  routputs = []
  for a in outputs:
    routputs.extend(a.numpy())
  for i, value in zip(rindices, routputs):
    result[i] = value

print(result)
 
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `experimental_run_v2` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `experimental_run_v2` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `experimental_run_v2` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `experimental_run_v2` inside a tf.function to get the best performance.
{0: 0, 1: 2, 2: 4, 3: 6, 4: 8, 5: 10, 6: 12, 7: 14, 8: 16, 9: 18, 10: 20, 11: 22, 12: 24, 13: 26, 14: 28, 15: 30, 16: 32, 17: 34, 18: 36, 19: 38, 20: 40, 21: 42, 22: 44, 23: 46}

كيف أقوم بتوزيع بياناتي إذا لم أستخدم مثيل tf.data.Dataset الكنسي؟

في بعض الأحيان ، لا يمكن للمستخدمين استخدام tf.data.Dataset لتمثيل مدخلاتهم وبالتالي واجهات برمجة التطبيقات المذكورة أعلاه لتوزيع مجموعة البيانات على أجهزة متعددة. في مثل هذه الحالات ، يمكنك استخدام موتر خام أو مدخلات من المولد.

استخدم وظيفة التجريبية التجريبية للتوزيع_القيمة_ لإدخالات الموتر التعسفية

tf.distribute.DistributedValues strategy.run tf.distribute.DistributedValues وهو الناتج next(iterator) . لتمرير قيم الموتر ، استخدم experimental_distribute_values_from_function لإنشاء tf.distribute.DistributedValues من tf.distribute.DistributedValues الأولية.

 mirrored_strategy = tf.distribute.MirroredStrategy()
worker_devices = mirrored_strategy.extended.worker_devices

def value_fn(ctx):
  return tf.constant(1.0)

distributed_values = mirrored_strategy.experimental_distribute_values_from_function(value_fn)
for _ in range(4):
  result = mirrored_strategy.run(lambda x:x, args=(distributed_values,))
  print(result)
 
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `experimental_run_v2` inside a tf.function to get the best performance.
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)

استخدم tf.data.Dataset.from_generator إذا كان الإدخال الخاص بك من مولد

إذا كان لديك وظيفة مولد تريد استخدامها ، يمكنك إنشاء نسخة tf.data.Dataset باستخدام from_generator API.

 mirrored_strategy = tf.distribute.MirroredStrategy()
def input_gen():
  while True:
    yield np.random.rand(4)

# use Dataset.from_generator
dataset = tf.data.Dataset.from_generator(
    input_gen, output_types=(tf.float32), output_shapes=tf.TensorShape([4]))
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
iterator = iter(dist_dataset)
for _ in range(4):
  mirrored_strategy.run(lambda x:x, args=(next(iterator),))
 
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)