لدي سؤال؟ تواصل مع المجتمع في منتدى زيارة منتدى TensorFlow

المدخلات الموزعة

عرض على TensorFlow.org تشغيل في Google Colab عرض المصدر على جيثب تحميل دفتر

توفر واجهات برمجة التطبيقات tf.distribute طريقة سهلة للمستخدمين لتوسيع نطاق تدريبهم من جهاز واحد إلى أجهزة متعددة. عند قياس نموذجهم ، يتعين على المستخدمين أيضًا توزيع مدخلاتهم عبر أجهزة متعددة. يوفر tf.distribute واجهات برمجة تطبيقات يمكنك من خلالها توزيع tf.distribute تلقائيًا عبر الأجهزة.

سيوضح لك هذا الدليل الطرق المختلفة التي يمكنك من خلالها إنشاء مجموعة بيانات tf.distribute باستخدام tf.distribute APIs. بالإضافة إلى ذلك ، سيتم تغطية الموضوعات التالية:

لا يغطي هذا الدليل استخدام المدخلات الموزعة مع Keras APIs.

مجموعات البيانات الموزعة

لاستخدام tf.distribute APIs tf.distribute ، يوصى بأن يستخدم المستخدمونtf.data.Dataset لتمثيل مدخلاتهم. tf.distribute للعمل بكفاءة معtf.data.Dataset (على سبيل المثال ، الجلب المسبق التلقائي للبيانات على كل جهاز تسريع) مع تحسينات الأداء التي يتم دمجها بانتظام في التنفيذ. إذا كانت لديك حالة استخدام لاستخدام شيء آخر غيرtf.data.Dataset ، فيرجى الرجوع إلى قسم لاحق في هذا الدليل. في حلقة التدريب غير الموزعة ، يقوم المستخدمون أولاً بإنشاء مثيلtf.data.Dataset ثم التكرار على العناصر. على سبيل المثال:

import tensorflow as tf

# Helper libraries
import numpy as np
import os

print(tf.__version__)
2.5.0
global_batch_size = 16
# Create a tf.data.Dataset object.
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)

@tf.function
def train_step(inputs):
  features, labels = inputs
  return labels - 0.3 * features

# Iterate over the dataset using the for..in construct.
for inputs in dataset:
  print(train_step(inputs))
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)

للسماح للمستخدمين باستخدام إستراتيجية tf.distribute مع الحد الأدنى من التغييرات على التعليمات البرمجية الحالية للمستخدم ، تم تقديم اثنين من واجهات برمجة التطبيقات التي من شأنها توزيع مثيلtf.data.Dataset وإرجاع كائن مجموعة البيانات الموزعة. يمكن للمستخدم بعد ذلك التكرار عبر مثيل مجموعة البيانات الموزعة هذا وتدريب نموذجهم كما كان من قبل. دعونا الآن نلقي نظرة على tf.distribute.Strategy.experimental_distribute_dataset API - tf.distribute.Strategy.experimental_distribute_dataset و tf.distribute.Strategy.distribute_datasets_from_function :

tf.distribute.Strategy.experimental_distribute_dataset

إستعمال

تأخذ واجهة برمجة التطبيقات هذه مثيلtf.data.Dataset كإدخال وترجع مثيل tf.distribute.DistributedDataset . يجب عليك تجميع مجموعة بيانات الإدخال بقيمة تساوي حجم الدُفعة العالمي. حجم الدُفعة العالمي هذا هو عدد العينات التي تريد معالجتها عبر جميع الأجهزة في خطوة واحدة. يمكنك تكرار مجموعة البيانات الموزعة هذه بطريقة Pythonic أو إنشاء مكرر باستخدام iter . الكائن الذي تم إرجاعه ليس مثيلtf.data.Dataset ولا يدعم أي واجهات برمجة تطبيقات أخرى تقوم بتحويل مجموعة البيانات أو فحصها بأي شكل من الأشكال. هذه هي واجهة برمجة التطبيقات الموصى بها إذا لم يكن لديك طرق محددة تريد من خلالها مشاركة إدخالاتك عبر نسخ متماثلة مختلفة.

global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
# Distribute input using the `experimental_distribute_dataset`.
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
# 1 global batch of data fed to the model in 1 step.
print(next(iter(dist_dataset)))
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
(<tf.Tensor: shape=(16, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>, <tf.Tensor: shape=(16, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>)

الخصائص

الخلط

tf.distribute tf.distributetf.data.Dataset مثيلtf.data.Dataset للإدخال بحجم دفعة جديد يساوي حجم الدُفعة العام مقسومًا على عدد النسخ المتماثلة قيد المزامنة. عدد النسخ المتماثلة المتزامنة يساوي عدد الأجهزة التي تشارك في تقليل التدرج اللوني أثناء التدريب. عندما يقوم المستخدم بالاتصال next على المكرر الموزع ، يتم إرجاع حجم مجموعة البيانات لكل نسخة متماثلة على كل نسخة متماثلة. سيكون عدد العناصر الأصلية لمجموعة البيانات المعاد تطعيمه دائمًا مضاعفًا لعدد النسخ المتماثلة. هنا بضعة أمثلة:

  • tf.data.Dataset.range(6).batch(4, drop_remainder=False)

    • بدون توزيع:
    • الدفعة 1: [0 ، 1 ، 2 ، 3]
    • الدفعة 2: [4 ، 5]
    • مع توزيع أكثر من نسختين متماثلة. الدفعة الأخيرة ([4 ، 5]) مقسمة بين نسختين متماثلتين.

    • الدفعة 1:

      • النسخة المتماثلة 1: [0، 1]
      • نسخة 2: [2، 3]
    • الدفعة 2:

      • نسخة 2: [4]
      • نسخة 2: [5]
  • tf.data.Dataset.range(4).batch(4)

    • بدون توزيع:
    • الدفعة 1: [[0] ، [1] ، [2] ، [3]]
    • مع التوزيع على 5 نسخ طبق الأصل:
    • الدفعة 1:
      • النسخة المتماثلة 1: [0]
      • النسخة المتماثلة 2: [1]
      • نسخة 3: [2]
      • نسخة 4: [3]
      • النسخة المكررة 5: []
  • tf.data.Dataset.range(8).batch(4)

    • بدون توزيع:
    • الدفعة 1: [0 ، 1 ، 2 ، 3]
    • الدفعة 2: [4 ، 5 ، 6 ، 7]
    • مع التوزيع على 3 نسخ طبق الأصل:
    • الدفعة 1:
      • النسخة المتماثلة 1: [0، 1]
      • نسخة 2: [2، 3]
      • نسخة 3: []
    • الدفعة 2:
      • نسخة 1: [4، 5]
      • نسخة 2: [6، 7]
      • نسخة 3: []

تتضمن إعادة تجميع مجموعة البيانات تعقيدًا في المساحة يزداد خطيًا مع عدد النسخ المتماثلة. هذا يعني أنه بالنسبة لحالة استخدام التدريب متعدد العمال ، يمكن أن يتعرض خط أنابيب الإدخال لأخطاء OOM.

التقسيم

tf.distribute أيضًا بالتوزيع التلقائي لمجموعة بيانات الإدخال في تدريب متعدد العمال باستخدام MultiWorkerMirroredStrategy و TPUStrategy . يتم إنشاء كل مجموعة بيانات على جهاز CPU الخاص بالعامل. تعني المشاركة التلقائية لمجموعة بيانات عبر مجموعة من العمال أنه يتم تعيين مجموعة فرعية من مجموعة البيانات بأكملها لكل عامل (إذا تم تعيين tf.data.experimental.AutoShardPolicy الصحيح). هذا لضمان أنه في كل خطوة ، سيتم معالجة حجم دفعة عالمية لعناصر مجموعة البيانات غير المتداخلة بواسطة كل عامل. يحتوي tf.data.experimental.DistributeOptions خيارين مختلفين يمكن تحديدهما باستخدام tf.data.experimental.DistributeOptions . لاحظ أنه لا توجد مشاركة تلقائية في تدريب العمال المتعددين باستخدام ParameterServerStrategy ، ويمكن العثور على مزيد من المعلومات حول إنشاء مجموعة البيانات باستخدام هذه الإستراتيجية في البرنامج التعليمي Parameter Server Strategy .

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
dataset = dataset.with_options(options)

توجد ثلاثة خيارات مختلفة يمكنك تعيينها لـ tf.data.experimental.AutoShardPolicy :

  • تلقائي: هذا هو الخيار الافتراضي مما يعني أنه سيتم إجراء محاولة لجزء بواسطة FILE. تفشل محاولة التجزئة حسب FILE إذا لم يتم اكتشاف مجموعة بيانات قائمة على ملف. tf.distribute وبعد ذلك تعود الى عملية التجزئة التي كتبها DATA. لاحظ أنه إذا كانت مجموعة بيانات الإدخال تستند إلى ملف ولكن عدد الملفات أقل من عدد العمال ، فسيتم رفع خطأ InvalidArgumentError . إذا حدث هذا ، AutoShardPolicy.DATA السياسة صراحةً على AutoShardPolicy.DATA ، أو قسّم مصدر الإدخال إلى ملفات أصغر بحيث يكون عدد الملفات أكبر من عدد العمال.
  • FILE: هذا هو الخيار إذا كنت تريد تقسيم ملفات الإدخال إلى جميع العاملين. يجب عليك استخدام هذا الخيار إذا كان عدد ملفات الإدخال أكبر بكثير من عدد العمال والبيانات الموجودة في الملفات موزعة بالتساوي. يتمثل الجانب السلبي لهذا الخيار في وجود عمال خامل إذا لم يتم توزيع البيانات الموجودة في الملفات بالتساوي. إذا كان عدد الملفات أقل من عدد العمال ، فسيتم رفع خطأ InvalidArgumentError . إذا حدث هذا ، AutoShardPolicy.DATA السياسة صراحةً على AutoShardPolicy.DATA . على سبيل المثال ، دعنا نوزع ملفين على عاملين مع نسخة متماثلة واحدة لكل منهما. يحتوي الملف 1 على [0 ، 1 ، 2 ، 3 ، 4 ، 5] ويحتوي الملف 2 على [6 ، 7 ، 8 ، 9 ، 10 ، 11]. اجعل العدد الإجمالي للنسخ المتماثلة المتزامنة 2 ويكون حجم الدُفعة العام 4.

    • عامل 0:
    • الدفعة 1 = النسخة المتماثلة 1: [0 ، 1]
    • الدفعة 2 = النسخة المتماثلة 1: [2 ، 3]
    • الدفعة 3 = النسخة المتماثلة 1: [4]
    • الدفعة 4 = النسخة المتماثلة 1: [5]
    • العامل 1:
    • الدفعة 1 = النسخة المتماثلة 2: [6 ، 7]
    • الدفعة 2 = النسخة المتماثلة 2: [8 ، 9]
    • الدفعة 3 = النسخة المتماثلة 2: [10]
    • الدفعة 4 = النسخة المتماثلة 2: [11]
  • البيانات: سيؤدي هذا إلى تجميع العناصر تلقائيًا عبر جميع العمال. سيقرأ كل عامل مجموعة البيانات بأكملها ويعالج الجزء المخصص لها فقط. سيتم التخلص من جميع القطع الأخرى. يتم استخدام هذا بشكل عام إذا كان عدد ملفات الإدخال أقل من عدد العمال وتريد تقسيم البيانات بشكل أفضل عبر جميع العاملين. الجانب السلبي هو أن مجموعة البيانات بأكملها ستتم قراءتها على كل عامل. على سبيل المثال ، دعونا نوزع ملفًا واحدًا على عاملين. يحتوي الملف 1 على [0 ، 1 ، 2 ، 3 ، 4 ، 5 ، 6 ، 7 ، 8 ، 9 ، 10 ، 11]. اجعل العدد الإجمالي للنسخ المتماثلة المتزامنة 2.

    • عامل 0:
    • الدفعة 1 = النسخة المتماثلة 1: [0 ، 1]
    • الدفعة 2 = النسخة المتماثلة 1: [4 ، 5]
    • الدفعة 3 = النسخة المتماثلة 1: [8 ، 9]
    • العامل 1:
    • الدفعة 1 = النسخة المتماثلة 2: [2 ، 3]
    • الدفعة 2 = النسخة المتماثلة 2: [6 ، 7]
    • الدفعة 3 = النسخة المتماثلة 2: [10 ، 11]
  • إيقاف التشغيل: إذا قمت بإيقاف تشغيل ميزة "المشاركة التلقائية" ، فسيقوم كل عامل بمعالجة جميع البيانات. على سبيل المثال ، دعونا نوزع ملفًا واحدًا على عاملين. يحتوي الملف 1 على [0 ، 1 ، 2 ، 3 ، 4 ، 5 ، 6 ، 7 ، 8 ، 9 ، 10 ، 11]. دع العدد الإجمالي للنسخ المتماثلة المتزامنة هو 2. ثم سيرى كل عامل التوزيع التالي:

    • عامل 0:
    • الدفعة 1 = النسخة المتماثلة 1: [0 ، 1]
    • الدفعة 2 = النسخة المتماثلة 1: [2 ، 3]
    • الدفعة 3 = النسخة المتماثلة 1: [4 ، 5]
    • الدفعة 4 = النسخة المتماثلة 1: [6 ، 7]
    • الدفعة 5 = النسخة المتماثلة 1: [8 ، 9]
    • الدفعة 6 = النسخة المتماثلة 1: [10 ، 11]

    • العامل 1:

    • الدفعة 1 = النسخة المتماثلة 2: [0 ، 1]

    • الدفعة 2 = النسخة المتماثلة 2: [2، 3]

    • الدفعة 3 = النسخة المتماثلة 2: [4 ، 5]

    • الدفعة 4 = النسخة المتماثلة 2: [6 ، 7]

    • الدفعة 5 = النسخة المتماثلة 2: [8 ، 9]

    • الدفعة 6 = النسخة المتماثلة 2: [10 ، 11]

الجلب المسبق

بشكل افتراضي ، يضيف tf.distribute تحويل الجلب المسبق في نهاية مثيلtf.data.Dataset المقدم من المستخدم. وسيطة تحويل الجلب المسبق التي تكون buffer_size تساوي عدد النسخ المتماثلة المتزامنة.

tf.distribute.Strategy.distribute_datasets_from_function

إستعمال

تأخذ واجهة برمجة التطبيقات هذه دالة إدخال وترجع مثيل tf.distribute.DistributedDataset . دالة الإدخال التي يمررها المستخدمون لها وسيطة tf.distribute.InputContext ويجب أن تُرجع مثيلtf.data.Dataset . باستخدام واجهة برمجة التطبيقات هذه ، لا tf.distribute أية تغييرات أخرى على مثيلtf.data.Dataset الخاص بالمستخدم الذيtf.data.Dataset إرجاعه من دالة الإدخال. تقع على عاتق المستخدم مسؤولية تجميع مجموعة البيانات وتقسيمها. يستدعي tf.distribute وظيفة الإدخال على جهاز وحدة المعالجة المركزية لكل عامل. بصرف النظر عن السماح للمستخدمين بتحديد منطق التجميع والتجزئة الخاص بهم ، توضح واجهة برمجة التطبيقات هذه أيضًا قابلية تطوير وأداء أفضل مقارنةً بـ tf.distribute.Strategy.experimental_distribute_dataset عند استخدامها لتدريب العديد من العمال.

mirrored_strategy = tf.distribute.MirroredStrategy()

def dataset_fn(input_context):
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)
  dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
  dataset = dataset.shard(
    input_context.num_input_pipelines, input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2) # This prefetches 2 batches per device.
  return dataset

dist_dataset = mirrored_strategy.distribute_datasets_from_function(dataset_fn)
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

الخصائص

الخلط

يجبtf.data.Dataset مثيلtf.data.Dataset الذي يمثل قيمة الإرجاع لوظيفة الإدخال في دُفعات باستخدام حجم الدُفعة لكل نسخة متماثلة. حجم الدُفعة لكل نسخة متماثلة هو حجم الدُفعة العمومي مقسومًا على عدد النسخ المتماثلة التي تشارك في تدريب المزامنة. هذا لأن tf.distribute يستدعي وظيفة الإدخال على جهاز وحدة المعالجة المركزية لكل عامل. يجب أن تكون مجموعة البيانات التي تم إنشاؤها على عامل معين جاهزة للاستخدام من قبل جميع النسخ المتماثلة الموجودة على هذا العامل.

التقسيم

يتم tf.distribute.InputContext الكائن tf.distribute.InputContext الذي يتم تمريره ضمنيًا كوسيطة إلى وظيفة الإدخال الخاصة بالمستخدم بواسطة tf.distribute أسفل الغطاء. يحتوي على معلومات حول عدد العمال ومعرف العامل الحالي وما إلى ذلك. يمكن لوظيفة الإدخال هذه معالجة التجزئة وفقًا للسياسات التي حددها المستخدم باستخدام هذه الخصائص التي تعد جزءًا من كائن tf.distribute.InputContext .

الجلب المسبق

لا يضيف tf.distribute تحويل الجلب المسبق في نهايةtf.data.Dataset التي يتم إرجاعها بواسطة وظيفة الإدخال التي يوفرها المستخدم.

تكرارات موزعة

على غرار مثيلاتtf.data.Dataset غير الموزعة ، ستحتاج إلى إنشاء مكرر في مثيلات tf.distribute.DistributedDataset والوصول إلى العناصر الموجودة في tf.distribute.DistributedDataset . فيما يلي الطرق التي يمكنك من خلالها إنشاء tf.distribute.DistributedIterator واستخدامه لتدريب نموذجك:

الأعراف

استخدم بنية Pythonic for loop

يمكنك استخدام حلقة Pythonic سهلة الاستخدام للتكرار عبر tf.distribute.DistributedDataset . العناصر التي تم إرجاعها من tf.distribute.DistributedIterator يمكن أن تكون tf.Tensor مفرد أو tf.distribute.DistributedValues التي تحتوي على قيمة لكل نسخة متماثلة. سيؤدي وضع الحلقة داخل tf.function إلى تعزيز الأداء. ومع ذلك ، فإن break return غير مدعومين حاليًا لحلقة عبر tf.distribute.DistributedDataset الموضوعة داخل tf.function .

global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

@tf.function
def train_step(inputs):
  features, labels = inputs
  return labels - 0.3 * features

for x in dist_dataset:
  # train_step trains the model using the dataset elements
  loss = mirrored_strategy.run(train_step, args=(x,))
  print("Loss is ", loss)
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)

استخدم iter لإنشاء مكرر صريح

إلى كرر أكثر من العناصر في tf.distribute.DistributedDataset سبيل المثال، يمكنك إنشاء tf.distribute.DistributedIterator باستخدام iter API على ذلك. باستخدام مكرر صريح ، يمكنك التكرار لعدد ثابت من الخطوات. للحصول على العنصر التالي من tf.distribute.DistributedIterator مثيل dist_iterator ، يمكنك استدعاء next(dist_iterator) أو dist_iterator.get_next() أو dist_iterator.get_next_as_optional() . السابقان هما في الأساس نفس الشيء:

num_epochs = 10
steps_per_epoch = 5
for epoch in range(num_epochs):
  dist_iterator = iter(dist_dataset)
  for step in range(steps_per_epoch):
    # train_step trains the model using the dataset elements
    loss = mirrored_strategy.run(train_step, args=(next(dist_iterator),))
    # which is the same as
    # loss = mirrored_strategy.run(train_step, args=(dist_iterator.get_next(),))
    print("Loss is ", loss)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)

مع next() أو tf.distribute.DistributedIterator.get_next() ، إذا وصل tf.distribute.DistributedIterator إلى نهايته ، فسيتم طرح خطأ OutOfRange. يمكن للعميل اكتشاف الخطأ من جانب Python ومواصلة القيام بأعمال أخرى مثل نقاط التفتيش والتقييم. ومع ذلك ، لن ينجح هذا إذا كنت تستخدم حلقة تدريب مضيف (على tf.function ، قم بتشغيل خطوات متعددة لكل tf.function ) ، والتي تبدو كما يلي:

@tf.function
def train_fn(iterator):
  for _ in tf.range(steps_per_loop):
    strategy.run(step_fn, args=(next(iterator),))

يحتوي train_fn على خطوات متعددة عن طريق لف جسم الخطوة داخل tf.range . في هذه الحالة ، يمكن أن تبدأ التكرارات المختلفة في الحلقة بدون تبعية بالتوازي ، لذلك يمكن تشغيل خطأ OutOfRange في التكرارات اللاحقة قبل انتهاء حساب التكرارات السابقة. بمجرد إلقاء خطأ OutOfRange ، سيتم إنهاء جميع العمليات في الوظيفة على الفور. إذا كانت هذه بعض الحالات التي ترغب في تجنبها ، فإن البديل الذي لا tf.distribute.DistributedIterator.get_next_as_optional() خطأ OutOfRange هو tf.distribute.DistributedIterator.get_next_as_optional() . get_next_as_optional بإرجاع tf.experimental.Optional الذي يحتوي على العنصر التالي أو لا يحتوي على قيمة إذا وصل tf.distribute.DistributedIterator إلى نهايته.

# You can break the loop with get_next_as_optional by checking if the Optional contains value
global_batch_size = 4
steps_per_loop = 5
strategy = tf.distribute.MirroredStrategy(devices=["GPU:0", "CPU:0"])

dataset = tf.data.Dataset.range(9).batch(global_batch_size)
distributed_iterator = iter(strategy.experimental_distribute_dataset(dataset))

@tf.function
def train_fn(distributed_iterator):
  for _ in tf.range(steps_per_loop):
    optional_data = distributed_iterator.get_next_as_optional()
    if not optional_data.has_value():
      break
    per_replica_results = strategy.run(lambda x:x, args=(optional_data.get_value(),))
    tf.print(strategy.experimental_local_results(per_replica_results))
train_fn(distributed_iterator)
WARNING:tensorflow:There are non-GPU devices in `tf.distribute.Strategy`, not using nccl allreduce.
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:CPU:0')
([0 1], [2 3])
([4 5], [6 7])
([8], [])

استخدام خاصية element_spec

إذا قمت بتمرير عناصر مجموعة بيانات موزعة إلى tf.function وتريد ضمان tf.TypeSpec ، فيمكنك تحديد وسيطة input_signature الخاصة tf.function . ناتج مجموعة البيانات الموزعة هو tf.distribute.DistributedValues التي يمكن أن تمثل الإدخال إلى جهاز واحد أو أجهزة متعددة. للحصول على tf.TypeSpec المطابق لهذه القيمة الموزعة ، يمكنك استخدام خاصية element_spec لمجموعة البيانات الموزعة أو كائن element_spec الموزع.

global_batch_size = 16
epochs = 5
steps_per_epoch = 5
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

@tf.function(input_signature=[dist_dataset.element_spec])
def train_step(per_replica_inputs):
  def step_fn(inputs):
    return 2 * inputs

  return mirrored_strategy.run(step_fn, args=(per_replica_inputs,))

for _ in range(epochs):
  iterator = iter(dist_dataset)
  for _ in range(steps_per_epoch):
    output = train_step(next(iterator))
    tf.print(output)
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])

دفعات جزئية

تتم مصادفة الدُفعات الجزئية عندما قد تحتوي مثيلاتtf.data.Dataset التيtf.data.Dataset المستخدمون على أحجام دُفعات غير قابلة للقسمة بالتساوي على عدد النسخ المتماثلة أو عندما لا يمكن القسمة على عدد النسخ المتماثلة من مجموعة البيانات على حجم الدُفعة. هذا يعني أنه عند توزيع مجموعة البيانات على عدة نسخ متماثلة ، فإن الاستدعاء next على بعض التكرارات سينتج عنه OutOfRangeError. للتعامل مع حالة الاستخدام هذه ، يقوم tf.distribute بإرجاع دفعات وهمية بحجم الدُفعة 0 على النسخ المتماثلة التي لا تحتوي على أي بيانات أخرى tf.distribute .

بالنسبة لحالة العامل المنفردة ، إذا لم يتم إرجاع البيانات بواسطة الاستدعاء next في المكرر ، يتم إنشاء دفعات وهمية بحجم 0 دفعة واستخدامها مع البيانات الحقيقية في مجموعة البيانات. في حالة الدُفعات الجزئية ، ستحتوي آخر دفعة عالمية من البيانات على بيانات حقيقية جنبًا إلى جنب مع مجموعات وهمية من البيانات. يتحقق شرط الإيقاف لمعالجة البيانات الآن مما إذا كان أي من النسخ المتماثلة يحتوي على بيانات. إذا لم تكن هناك بيانات في أي من النسخ المتماثلة ، فسيتم طرح خطأ OutOfRange.

بالنسبة للحالة متعددة العمال ، يتم تجميع القيمة المنطقية التي تمثل وجود البيانات على كل عامل باستخدام اتصال متماثل متقاطع ويستخدم هذا لتحديد ما إذا كان جميع العمال قد انتهوا من معالجة مجموعة البيانات الموزعة. نظرًا لأن هذا يتضمن التواصل عبر العمال ، فهناك بعض عقوبات الأداء.

تحفظات

  • عند استخدام tf.distribute.Strategy.experimental_distribute_dataset APIs مع إعداد عامل متعدد ، يمرر المستخدمونtf.data.Dataset الذي يقرأ من الملفات. إذا تم تعيين tf.data.experimental.AutoShardPolicy على AUTO أو FILE ، فقد يكون حجم الدُفعة الفعلي لكل خطوة أصغر من حجم الدُفعة العام الذي يحدده المستخدم. يمكن أن يحدث هذا عندما تكون العناصر المتبقية في الملف أقل من حجم الدُفعة العام. يمكن للمستخدمين إما استنفاد مجموعة البيانات دون الاعتماد على عدد خطوات التشغيل أو تعيين tf.data.experimental.AutoShardPolicy إلى DATA للتغلب عليها.

  • تحويلات مجموعة البيانات ذات الحالة غير مدعومة حاليًا مع tf.distribute وأي عمليات ذات حالة قد تكون مجموعة البيانات قد تم تجاهلها حاليًا. على سبيل المثال ، إذا كانت مجموعة البيانات الخاصة بك تحتوي على map_fn يستخدم tf.random.uniform لتدوير صورة ، tf.random.uniform رسم بياني لمجموعة البيانات يعتمد على الحالة (أي البذور العشوائية) على الجهاز المحلي حيث يتم تنفيذ عملية Python.

  • tf.data.experimental.OptimizationOptions أن tf.data.experimental.OptimizationOptions التجريبية tf.data.experimental.OptimizationOptions التي يتم تعطيلها افتراضيًا في سياقات معينة - مثل عند استخدامها مع tf.distribute - في تدهور الأداء. يجب عليك تمكينها فقط بعد التحقق من أنها تستفيد من أداء عبء العمل الخاص بك في إعداد التوزيع.

  • يرجى الرجوع إلى هذا الدليل لمعرفة كيفية تحسين خط أنابيب الإدخال الخاص بك باستخدام tf.data بشكل عام. بعض النصائح الإضافية:

    • إذا كان لديك العديد من العمال وكنت تستخدم tf.data.Dataset.list_files لإنشاء مجموعة بيانات من جميع الملفات التي تطابق نمطًا واحدًا أو أكثر من أنماط الكرة الأرضية ، فتذكر تعيين الوسيطة seed أو ضبط shuffle=False بحيث يقوم كل عامل بتقسيم الملف باستمرار.

    • إذا كان خط أنابيب الإدخال الخاص بك يتضمن كلاً من خلط البيانات على مستوى السجل وتحليل البيانات ، ما لم تكن البيانات غير المحللة أكبر بكثير من البيانات التي تم تحليلها (وهذا ليس هو الحال عادةً) ، فتبادل أولاً ثم قم بالتحليل ، كما هو موضح في المثال التالي. قد يفيد هذا استخدام الذاكرة والأداء.

d = tf.data.Dataset.list_files(pattern, shuffle=False)
d = d.shard(num_workers, worker_index)
d = d.repeat(num_epochs)
d = d.shuffle(shuffle_buffer_size)
d = d.interleave(tf.data.TFRecordDataset,
                 cycle_length=num_readers, block_length=1)
d = d.map(parser_fn, num_parallel_calls=num_map_threads)
  • tf.data.Dataset.shuffle(buffer_size, seed=None, reshuffle_each_iteration=None) على مخزن مؤقت داخلي لعناصر buffer_size المخزن المؤقت ، وبالتالي تقليل buffer_size المخزن المؤقت يمكن أن يخفف من مشكلة OOM.

  • الترتيب الذي تتم معالجة البيانات به بواسطة العمال عند استخدام tf.distribute.experimental_distribute_dataset أو tf.distribute.distribute_datasets_from_function غير مضمون. عادة ما يكون هذا مطلوبًا إذا كنت تستخدم tf.distribute لتوقع المقياس. ومع ذلك ، يمكنك إدراج فهرس لكل عنصر في الدُفعة وطلب المخرجات وفقًا لذلك. المقتطف التالي هو مثال على كيفية طلب المخرجات.

mirrored_strategy = tf.distribute.MirroredStrategy()
dataset_size = 24
batch_size = 6
dataset = tf.data.Dataset.range(dataset_size).enumerate().batch(batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

def predict(index, inputs):
  outputs = 2 * inputs
  return index, outputs

result = {}
for index, inputs in dist_dataset:
  output_index, outputs = mirrored_strategy.run(predict, args=(index, inputs))
  indices = list(mirrored_strategy.experimental_local_results(output_index))
  rindices = []
  for a in indices:
    rindices.extend(a.numpy())
  outputs = list(mirrored_strategy.experimental_local_results(outputs))
  routputs = []
  for a in outputs:
    routputs.extend(a.numpy())
  for i, value in zip(rindices, routputs):
    result[i] = value

print(result)
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
{0: 0, 1: 2, 2: 4, 3: 6, 4: 8, 5: 10, 6: 12, 7: 14, 8: 16, 9: 18, 10: 20, 11: 22, 12: 24, 13: 26, 14: 28, 15: 30, 16: 32, 17: 34, 18: 36, 19: 38, 20: 40, 21: 42, 22: 44, 23: 46}

كيف يمكنني توزيع بياناتي إذا كنت لا أستخدم مثيل tf.data.Dataset الكنسي؟

في بعض الأحيان لا يمكن للمستخدمين استخدامtf.data.Dataset لتمثيل مدخلاتهم وبالتالي واجهات برمجة التطبيقات المذكورة أعلاه لتوزيع مجموعة البيانات على أجهزة متعددة. في مثل هذه الحالات ، يمكنك استخدام موتر خام أو مدخلات من المولد.

استخدم دالة_توزيع_القيمة_التجريبية_وظيفة_من أجل مدخلات موتر تعسفية

strategy.run يقبل tf.distribute.DistributedValues وهو إخراج next(iterator) . لتمرير القيم الموترة، استخدم experimental_distribute_values_from_function لبناء tf.distribute.DistributedValues من التنسورات الخام.

mirrored_strategy = tf.distribute.MirroredStrategy()
worker_devices = mirrored_strategy.extended.worker_devices

def value_fn(ctx):
  return tf.constant(1.0)

distributed_values = mirrored_strategy.experimental_distribute_values_from_function(value_fn)
for _ in range(4):
  result = mirrored_strategy.run(lambda x:x, args=(distributed_values,))
  print(result)
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)

استخدم tf.data.Dataset.from_generator إذا كان الإدخال من مولد

إذا كانت لديك وظيفة منشئ تريد استخدامها ، فيمكنك إنشاء مثيلtf.data.Dataset باستخدام from_generator API.

mirrored_strategy = tf.distribute.MirroredStrategy()
def input_gen():
  while True:
    yield np.random.rand(4)

# use Dataset.from_generator
dataset = tf.data.Dataset.from_generator(
    input_gen, output_types=(tf.float32), output_shapes=tf.TensorShape([4]))
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
iterator = iter(dist_dataset)
for _ in range(4):
  mirrored_strategy.run(lambda x:x, args=(next(iterator),))
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)