العمل مع ClientData الخاص بـ tff.

عرض على TensorFlow.org تشغيل في Google Colab عرض المصدر على جيثب تحميل دفتر

تعتبر فكرة مجموعة البيانات التي يرمز إليها العملاء (مثل المستخدمين) ضرورية للحساب الموحد كما هو موضح في TFF. يوفر TFF واجهة tff.simulation.datasets.ClientData مجردة على هذا المفهوم، ومجموعات البيانات التي تستضيف TFF ( ستاكوفيرفلوو ، شكسبير ، emnist ، cifar100 ، و gldv2 ) عن تنفيذ هذه الواجهة.

إذا كنت تعمل على التعلم الاتحادية مع مجموعة البيانات الخاصة بك، TFF تشجع بشدة على تنفيذ إما ClientData اجهة أو استخدام واحدة من وظائف المساعد TFF لتوليد ClientData التي تمثل البيانات على القرص، على سبيل المثال tff.simulation.datasets.ClientData.from_clients_and_fn .

لأن معظم TFF في الأمثلة نهاية إلى نهاية تبدأ ClientData الكائنات، وتنفيذ ClientData التفاعل مع مجموعة البيانات المخصصة سيجعل من الاسهل لspelunk من خلال التعليمات البرمجية الموجودة مكتوبة مع TFF. علاوة على ذلك، tf.data.Datasets التي ClientData بنيات يمكن كرر أكثر مباشرة لانتاج هياكل numpy صفائف، لذلك ClientData كائنات يمكن استخدامها مع أي إطار ML مقرها بيثون قبل أن ينتقل إلى TFF.

هناك العديد من الأنماط التي يمكنك من خلالها تسهيل حياتك إذا كنت تنوي توسيع نطاق عمليات المحاكاة الخاصة بك إلى العديد من الأجهزة أو نشرها. أدناه سوف نسير من خلال عدد قليل من الطرق التي يمكننا استخدامها ClientData وTFF لجعل لدينا على نطاق صغير التكرار لنطاق واسع من التجارب لإنتاج تجربة نشر بأكبر قدر من السلاسة.

ما هو النمط الذي يجب علي استخدامه لتمرير ClientData إلى TFF؟

سوف نناقش اثنين الأعراف TFF في ClientData في العمق. إذا كنت تندرج ضمن أي من الفئتين أدناه ، فمن الواضح أنك ستفضل واحدة على الأخرى. إذا لم يكن الأمر كذلك ، فقد تحتاج إلى فهم أكثر تفصيلاً لإيجابيات وسلبيات كل منها لاتخاذ خيار أكثر دقة.

  • أريد التكرار في أسرع وقت ممكن على جهاز محلي ؛ لست بحاجة إلى أن أكون قادرًا على الاستفادة بسهولة من وقت تشغيل TFF الموزع.

    • تريد تمرير tf.data.Datasets في لTFF مباشرة.
    • وهذا يسمح لك البرنامج حتما مع tf.data.Dataset الكائنات، ومعالجتها بشكل تعسفي.
    • يوفر مرونة أكثر من الخيار أدناه ؛ يتطلب دفع المنطق للعملاء أن يكون هذا المنطق قابلاً للتسلسل.
  • أرغب في تشغيل حسابي المتحد في وقت تشغيل TFF البعيد ، أو أخطط للقيام بذلك قريبًا.

    • في هذه الحالة ، تريد تعيين بناء مجموعة البيانات والمعالجة المسبقة للعملاء.
    • هذه النتائج فيكم تمر ببساطة قائمة client_ids مباشرة إلى حساب الاتحادية الخاص بك.
    • يؤدي دفع إنشاء مجموعة البيانات والمعالجة المسبقة للعملاء إلى تجنب الاختناقات في التسلسل ، وزيادة الأداء بشكل كبير مع مئات الآلاف من العملاء.

قم بإعداد بيئة مفتوحة المصدر

حزم الاستيراد

معالجة كائن ClientData

دعونا نبدأ اليوم التحميل واستكشاف TFF في EMNIST ClientData :

client_data, _ = tff.simulation.datasets.emnist.load_data()
Downloading emnist_all.sqlite.lzma: 100%|██████████| 170507172/170507172 [00:20<00:00, 8391642.15it/s]
2021-09-03 11:17:37.005231: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected

يمكن أن تفقد بيانات الأولى تخبرنا ما هو نوع من الأمثلة هي في ClientData .

first_client_id = client_data.client_ids[0]
first_client_dataset = client_data.create_tf_dataset_for_client(
    first_client_id)
print(first_client_dataset.element_spec)
# This information is also available as a `ClientData` property:
assert client_data.element_type_structure == first_client_dataset.element_spec
OrderedDict([('label', TensorSpec(shape=(), dtype=tf.int32, name=None)), ('pixels', TensorSpec(shape=(28, 28), dtype=tf.float32, name=None))])

علما بأن عائدات مجموعة البيانات collections.OrderedDict الكائنات التي لها pixels و label مفاتيح، حيث بكسل هو موتر مع شكل [28, 28] . لنفترض أننا نود أن تتسطح المدخلات للخروج إلى شكل [784] . أن أحد السبل الممكنة يمكننا أن نفعل هذا يكون لتطبيق وظيفة ما قبل التصنيع لدينا ClientData الكائن.

def preprocess_dataset(dataset):
  """Create batches of 5 examples, and limit to 3 batches."""

  def map_fn(input):
    return collections.OrderedDict(
        x=tf.reshape(input['pixels'], shape=(-1, 784)),
        y=tf.cast(tf.reshape(input['label'], shape=(-1, 1)), tf.int64),
    )

  return dataset.batch(5).map(
      map_fn, num_parallel_calls=tf.data.experimental.AUTOTUNE).take(5)


preprocessed_client_data = client_data.preprocess(preprocess_dataset)

# Notice that we have both reshaped and renamed the elements of the ordered dict.
first_client_dataset = preprocessed_client_data.create_tf_dataset_for_client(
    first_client_id)
print(first_client_dataset.element_spec)
OrderedDict([('x', TensorSpec(shape=(None, 784), dtype=tf.float32, name=None)), ('y', TensorSpec(shape=(None, 1), dtype=tf.int64, name=None))])

قد نرغب بالإضافة إلى إجراء بعض المعالجة المسبقة الأكثر تعقيدًا (وربما ذات الحالة) ، على سبيل المثال الخلط.

def preprocess_and_shuffle(dataset):
  """Applies `preprocess_dataset` above and shuffles the result."""
  preprocessed = preprocess_dataset(dataset)
  return preprocessed.shuffle(buffer_size=5)

preprocessed_and_shuffled = client_data.preprocess(preprocess_and_shuffle)

# The type signature will remain the same, but the batches will be shuffled.
first_client_dataset = preprocessed_and_shuffled.create_tf_dataset_for_client(
    first_client_id)
print(first_client_dataset.element_spec)
OrderedDict([('x', TensorSpec(shape=(None, 784), dtype=tf.float32, name=None)), ('y', TensorSpec(shape=(None, 1), dtype=tf.int64, name=None))])

التواصل مع tff.Computation

الآن يمكننا أن أداء بعض التلاعب الأساسية مع ClientData الأشياء، ونحن مستعدون للبيانات تغذية ل tff.Computation . نحدد tff.templates.IterativeProcess الذي ينفذ اتحاد المتوسط ، واستكشاف طرق مختلفة لتمرير البيانات.

def model_fn():
  model = tf.keras.models.Sequential([
      tf.keras.layers.InputLayer(input_shape=(784,)),
      tf.keras.layers.Dense(10, kernel_initializer='zeros'),
  ])
  return tff.learning.from_keras_model(
      model,
      # Note: input spec is the _batched_ shape, and includes the 
      # label tensor which will be passed to the loss function. This model is
      # therefore configured to accept data _after_ it has been preprocessed.
      input_spec=collections.OrderedDict(
          x=tf.TensorSpec(shape=[None, 784], dtype=tf.float32),
          y=tf.TensorSpec(shape=[None, 1], dtype=tf.int64)),
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])

trainer = tff.learning.build_federated_averaging_process(
    model_fn,
    client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.01))

وقبل أن نبدأ العمل مع هذه IterativeProcess ، تعليق واحد على دلالات ClientData في محله. A ClientData يمثل الكائن مجمل السكان متاح للتدريب الاتحادية، التي بشكل عام هو غير متوفرة للبيئة تنفيذ نظام FL إنتاج وغير محددة لمحاكاة. ClientData الواقع يعطي المستخدم القدرة على تجاوز الحوسبة الاتحادية تماما وببساطة تدريب نموذج من جانب الخادم كالمعتاد عبر ClientData.create_tf_dataset_from_all_clients .

بيئة محاكاة TFF تضع الباحث في سيطرة كاملة على الحلقة الخارجية. على وجه الخصوص ، هذا يعني أن اعتبارات توفر العميل ، وانقطاع العميل ، وما إلى ذلك ، يجب معالجتها من قبل المستخدم أو برنامج بايثون النصي. يمكن للمرء عن التسرب العميل مثال نموذجي عن طريق ضبط توزيع المعاينة على الخاص ClientData's client_ids مثل أن المستخدمين مع المزيد من البيانات (وتبعا لأطول تشغيل الحسابات المحلية) وسيتم اختيار مع انخفاض احتمال.

ومع ذلك ، في النظام الفيدرالي الحقيقي ، لا يمكن اختيار العملاء بشكل صريح من قبل المدرب النموذجي ؛ يتم تفويض اختيار العملاء للنظام الذي يقوم بتنفيذ الحساب الموحد.

يمر tf.data.Datasets مباشرة إلى TFF

خيار واحد لدينا للتفاعل بين ClientData و IterativeProcess هو أن بناء tf.data.Datasets في بيثون، وتمرير هذه المجموعات إلى TFF.

لاحظ أن إذا أردنا استخدام preprocessed لدينا ClientData مجموعات البيانات سلمنا هي من النوع المناسب المتوقع من قبل نموذجنا المحدد أعلاه.

selected_client_ids = preprocessed_and_shuffled.client_ids[:10]

preprocessed_data_for_clients = [
    preprocessed_and_shuffled.create_tf_dataset_for_client(
        selected_client_ids[i]) for i in range(10)
]

state = trainer.initialize()
for _ in range(5):
  t1 = time.time()
  state, metrics = trainer.next(state, preprocessed_data_for_clients)
  t2 = time.time()
  print('loss {}, round time {}'.format(metrics['train']['loss'], t2 - t1))
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_federated/python/core/impl/compiler/tensorflow_computation_transformations.py:60: extract_sub_graph (from tensorflow.python.framework.graph_util_impl) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.compat.v1.graph_util.extract_sub_graph`
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_federated/python/core/impl/compiler/tensorflow_computation_transformations.py:60: extract_sub_graph (from tensorflow.python.framework.graph_util_impl) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.compat.v1.graph_util.extract_sub_graph`
loss 3.1523890495300293, round time 4.611936569213867
loss 3.084413528442383, round time 0.512113094329834
loss 2.785008192062378, round time 0.5057694911956787
loss 3.0030126571655273, round time 0.5013787746429443
loss 2.6526098251342773, round time 0.49185681343078613

وإذا أخذنا هذا الطريق، ولكننا لن تكون قادرة على التحرك بشكل مسلي لمحاكاة المتعدد ذات. مجموعات البيانات نبني في وقت TensorFlow المحلي يمكن التقاط دولة من البيئة المحيطة الثعبان، وتفشل في التسلسل أو إلغاء التسلسل عندما تحاول الدولة المرجعية التي لم تعد متوفرة لهم. هذا يمكن أن تظهر على سبيل المثال في خطأ غامض من TensorFlow في tensor_util.cc :

Check failed: DT_VARIANT == input.dtype() (21 vs. 20)

رسم خرائط البناء والمعالجة المسبقة للعملاء

لتجنب هذه المشكلة، توصي TFF مستخدميها للنظر في مجموعة البيانات مثيل وتجهيزها بوصفها شيئا ما يحدث محليا على كل عميل، واستخدام المساعدين TFF أو federated_map لتشغيل صراحة هذا الرمز تجهيزها في كل عميل.

من الناحية المفاهيمية ، فإن سبب تفضيل ذلك واضح: في وقت التشغيل المحلي لـ TFF ، يكون للعملاء "عرضًا" فقط الوصول إلى بيئة Python العالمية نظرًا لحقيقة أن التنسيق الموحد بأكمله يحدث على جهاز واحد. من الجدير بالذكر في هذه المرحلة أن التفكير المماثل يؤدي إلى فلسفة TFF الوظيفية عبر الأنظمة الأساسية والقابلة للتسلسل دائمًا.

TFF يجعل مثل هذا التغيير بسيط عبر ClientData's السمة dataset_computation ، و tff.Computation الذي يأخذ client_id وإرجاع المرتبطة tf.data.Dataset .

لاحظ أن preprocess يعمل ببساطة مع dataset_computation . و dataset_computation سمة من preprocessed ClientData تضم خط أنابيب تجهيزها بالكامل حددنا فقط:

print('dataset computation without preprocessing:')
print(client_data.dataset_computation.type_signature)
print('\n')
print('dataset computation with preprocessing:')
print(preprocessed_and_shuffled.dataset_computation.type_signature)
dataset computation without preprocessing:
(string -> <label=int32,pixels=float32[28,28]>*)


dataset computation with preprocessing:
(string -> <x=float32[?,784],y=int64[?,1]>*)

نحن يمكن استدعاء dataset_computation والحصول على بيانات حريصة في وقت التشغيل بيثون، ولكن يمارس السلطة الحقيقية لهذا النهج عندما يؤلف مع عملية تكرارية أو حساب آخر لتجنب تجسيد هذه المجموعات في وقت حريصة العالمي على الإطلاق. يوفر TFF وظيفة مساعد tff.simulation.compose_dataset_computation_with_iterative_process التي يمكن استخدامها لتفعل بالضبط هذا.

trainer_accepting_ids = tff.simulation.compose_dataset_computation_with_iterative_process(
    preprocessed_and_shuffled.dataset_computation, trainer)

كل من هذه tff.templates.IterativeProcesses واحدة فوق تشغيل بنفس الطريقة. ولكن السابق يقبل مجموعات البيانات العميل preprocessed، وهذا الأخير يقبل سلاسل تمثل هويات العملاء، ومعالجة كل بناء مجموعة البيانات وتجهيزها في الجسم - في الواقع state يمكن أن تنتقل بين البلدين.

for _ in range(5):
  t1 = time.time()
  state, metrics = trainer_accepting_ids.next(state, selected_client_ids)
  t2 = time.time()
  print('loss {}, round time {}'.format(metrics['train']['loss'], t2 - t1))
loss 3.003934621810913, round time 1.6643006801605225
loss 2.8357160091400146, round time 0.6825358867645264
loss 2.648752450942993, round time 0.5224685668945312
loss 2.6852502822875977, round time 0.5058529376983643
loss 2.8946309089660645, round time 0.5055575370788574

التحجيم لأعداد كبيرة من العملاء

trainer_accepting_ids يمكن على الفور أن تستخدم في وقت التشغيل المتعدد ذات TFF، وويتجنب تتجسد tf.data.Datasets وحدة تحكم (وبالتالي تسلسل منهم وإرسالها إلى العمال).

يؤدي هذا إلى تسريع عمليات المحاكاة الموزعة بشكل كبير ، خاصة مع عدد كبير من العملاء ، ويتيح التجميع الوسيط لتجنب حدوث تسلسل / إلغاء تسلسل مماثل.

deepdive الاختياري: تكوين منطق المعالجة المسبقة يدويًا في TFF

تم تصميم TFF للتكوين من الألف إلى الياء ؛ نوع التكوين الذي يؤديه للتو مساعد TFF هو تحت سيطرتنا تمامًا كمستخدمين. أننا يمكن أن يكون يدويا تكوين حساب تجهيزها حددنا للتو مع المدرب تلقاء next بكل بساطة:

selected_clients_type = tff.FederatedType(preprocessed_and_shuffled.dataset_computation.type_signature.parameter, tff.CLIENTS)

@tff.federated_computation(trainer.next.type_signature.parameter[0], selected_clients_type)
def new_next(server_state, selected_clients):
  preprocessed_data = tff.federated_map(preprocessed_and_shuffled.dataset_computation, selected_clients)
  return trainer.next(server_state, preprocessed_data)

manual_trainer_with_preprocessing = tff.templates.IterativeProcess(initialize_fn=trainer.initialize, next_fn=new_next)

في الواقع ، هذا هو ما يفعله المساعد الذي استخدمناه بشكل فعال تحت الغطاء (بالإضافة إلى إجراء فحص ومعالجة مناسبة للنوع). نحن يمكن حتى أعربوا عن نفس منطق مختلف قليلا، من خلال تسلسل preprocess_and_shuffle إلى tff.Computation ، ومتحللة federated_map في خطوة واحدة الذي يبني قواعد البيانات-preprocessed الامم المتحدة وآخر الذي يدير preprocess_and_shuffle في كل عميل.

يمكننا التحقق من أن هذا المسار اليدوي ينتج عنه عمليات حسابية بنفس نوع التوقيع مثل مساعد TFF (أسماء معلمات النموذج):

print(trainer_accepting_ids.next.type_signature)
print(manual_trainer_with_preprocessing.next.type_signature)
(<server_state=<model=<trainable=<float32[784,10],float32[10]>,non_trainable=<>>,optimizer_state=<int64>,delta_aggregate_state=<value_sum_process=<>,weight_sum_process=<>>,model_broadcast_state=<>>@SERVER,federated_dataset={string}@CLIENTS> -> <<model=<trainable=<float32[784,10],float32[10]>,non_trainable=<>>,optimizer_state=<int64>,delta_aggregate_state=<value_sum_process=<>,weight_sum_process=<>>,model_broadcast_state=<>>@SERVER,<broadcast=<>,aggregation=<mean_value=<>,mean_weight=<>>,train=<sparse_categorical_accuracy=float32,loss=float32>,stat=<num_examples=int64>>@SERVER>)
(<server_state=<model=<trainable=<float32[784,10],float32[10]>,non_trainable=<>>,optimizer_state=<int64>,delta_aggregate_state=<value_sum_process=<>,weight_sum_process=<>>,model_broadcast_state=<>>@SERVER,selected_clients={string}@CLIENTS> -> <<model=<trainable=<float32[784,10],float32[10]>,non_trainable=<>>,optimizer_state=<int64>,delta_aggregate_state=<value_sum_process=<>,weight_sum_process=<>>,model_broadcast_state=<>>@SERVER,<broadcast=<>,aggregation=<mean_value=<>,mean_weight=<>>,train=<sparse_categorical_accuracy=float32,loss=float32>,stat=<num_examples=int64>>@SERVER>)