কাস্টম সমষ্টি বাস্তবায়ন

TensorFlow.org এ দেখুন Google Colab-এ চালান GitHub-এ উৎস দেখুন নোটবুক ডাউনলোড করুন

এই টিউটোরিয়াল, আমরা পিছনে নকশা নীতির ব্যাখ্যা tff.aggregators মডিউল এবং সার্ভারে ক্লায়েন্ট থেকে মূল্যবোধের কাস্টম অ্যাগ্রিগেশন বাস্তবায়নের জন্য সর্বোত্তম অনুশীলন।

পূর্বশর্ত। এই টিউটোরিয়ালটি অনুমান যদি আপনার আগে থেকেই মৌলিক ধারণার সঙ্গে পরিচিত ফেডারেটেড কোর যেমন এদেশের (যেমন tff.SERVER , tff.CLIENTS ), কিভাবে TFF কম্পিউটেশন প্রতিনিধিত্ব করে ( tff.tf_computation , tff.federated_computation ) এবং তাদের টাইপ স্বাক্ষর।

!pip install --quiet --upgrade tensorflow_federated_nightly
!pip install --quiet --upgrade nest_asyncio

import nest_asyncio
nest_asyncio.apply()

নকশা সারাংশ

TFF মধ্যে "অ্যাগ্রিগেশন" এ মানগুলির সেট আন্দোলন বোঝায় tff.CLIENTS মধ্যে একই ধরণের একটি সমষ্টিগত মান উত্পাদন করতে tff.SERVER । যে, প্রতিটি পৃথক ক্লায়েন্ট মান উপলব্ধ করা প্রয়োজন হয় না. উদাহরণস্বরূপ, ফেডারেটেড লার্নিং-এ, সার্ভারে গ্লোবাল মডেলে প্রয়োগ করার জন্য একটি সামগ্রিক মডেল আপডেট পেতে ক্লায়েন্ট মডেল আপডেটগুলি গড় করা হয়।

যেমন এই লক্ষ্য পৌঁছনোর অপারেটার ছাড়াও tff.federated_sum , TFF উপলব্ধ tff.templates.AggregationProcess (ক stateful প্রক্রিয়া ) যা অ্যাগ্রিগেশন গণনার জন্য টাইপ স্বাক্ষর formalizes তাই এটি একটি সহজ সমষ্টি থেকে অনেক বেশি জটিল ফরম সাধারণের পারবেন না।

প্রধান উপাদান tff.aggregators মডিউল সৃষ্টির জন্য কারখানা AggregationProcess , যা দুই দিক TFF এর সাধারণত দরকারী এবং replacable বিল্ডিং ব্লক হওয়ার জন্য পরিকল্পিত:

  1. প্যারামিটারাইজড গণনা। সমষ্টি একটি স্বাধীন বিল্ডিং ব্লক যে সঙ্গে কাজ করার জন্য ডিজাইন করা অন্যান্য TFF মডিউল মধ্যে প্লাগ করা যেতে পারে tff.aggregators তাদের প্রয়োজনীয় অ্যাগ্রিগেশন parameterize করতে।

উদাহরণ:

learning_process = tff.learning.build_federated_averaging_process(
    ...,
    model_update_aggregation_factory=tff.aggregators.MeanFactory())
  1. একত্রীকরণ রচনা। আরও জটিল যৌগিক সমষ্টি তৈরি করতে একটি সমষ্টি বিল্ডিং ব্লক অন্যান্য একত্রীকরণ বিল্ডিং ব্লকের সাথে তৈরি করা যেতে পারে।

উদাহরণ:

secure_mean = tff.aggregators.MeanFactory(
    value_sum_factory=tff.aggregators.SecureSumFactory(...))

এই টিউটোরিয়ালের বাকি অংশ ব্যাখ্যা করে কিভাবে এই দুটি লক্ষ্য অর্জন করা হয়।

একত্রিতকরণ প্রক্রিয়া

আমরা প্রথম সংক্ষেপ tff.templates.AggregationProcess , এবং তার সৃষ্টির জন্য কারখানা প্যাটার্ন সঙ্গে অনুসরণ করুন।

tff.templates.AggregationProcess একটি হল tff.templates.MeasuredProcess অ্যাগ্রিগেশন জন্য নির্দিষ্ট টাইপ স্বাক্ষর করেন। বিশেষ করে, initialize এবং next ফাংশন নিম্নলিখিত টাইপ স্বাক্ষর আছে:

  • ( -> state_type@SERVER)
  • (<state_type@SERVER, {value_type}@CLIENTS, *> -> <state_type@SERVER, value_type@SERVER, measurements_type@SERVER>)

(টাইপ রাজ্যের state_type ) সার্ভারে স্থাপিত ইন করতে হবে। next ফাংশন ইনপুট যুক্তি রাষ্ট্র এবং একটি মান সমষ্টিগত করা (টাইপ হিসাবে লাগে value_type ) ক্লায়েন্ট স্থাপিত। * উপায়ে একটি ভরযুক্ত গড় উদাহরণস্বরূপ ওজন জন্য অন্যান্য ইনপুট আর্গুমেন্ট ঐচ্ছিক,। এটি একটি আপডেট করা স্টেট অবজেক্ট, সার্ভারে রাখা একই ধরনের সমষ্টিগত মান এবং কিছু পরিমাপ প্রদান করে।

নোট উভয় রাষ্ট্রের মৃত্যুদণ্ড কার্যকর মধ্যে পাস করা next ফাংশন, এবং রিপোর্ট পরিমাপ একটি নির্দিষ্ট সঞ্চালনের উপর নির্ভর করে কোন তথ্য প্রতিবেদন করার উদ্দেশ্যে next ফাংশন, খালি হতে পারে। তা সত্ত্বেও, TFF-এর অন্যান্য অংশগুলির জন্য একটি স্পষ্ট চুক্তি অনুসরণ করার জন্য তাদের স্পষ্টভাবে উল্লেখ করতে হবে।

অন্যান্য TFF মডিউল, উদাহরণস্বরূপ মডেল আপডেটের জন্য tff.learning , ব্যবহার করা আশা করা যায় tff.templates.AggregationProcess parameterize কিভাবে মান সমষ্টি করছে। যাইহোক, সমষ্টিগত মানগুলি ঠিক কী এবং তাদের টাইপ স্বাক্ষরগুলি কী তা নির্ভর করে প্রশিক্ষিত মডেলের অন্যান্য বিবরণ এবং এটি করতে ব্যবহৃত শেখার অ্যালগরিদমের উপর।

কম্পিউটেশন অন্যান্য দিক অ্যাগ্রিগেশন স্বাধীন করতে, আমরা কারখানা প্যাটার্ন ব্যবহার করেন - আমরা যথাযথ তৈরি tff.templates.AggregationProcess একবার বস্তুর প্রাসঙ্গিক ধরনের স্বাক্ষর সমষ্টিগত করা পাওয়া যায়, আবাহন করার মাধ্যমে create কারখানার পদ্ধতি। এইভাবে একত্রিতকরণ প্রক্রিয়ার সরাসরি পরিচালনা শুধুমাত্র গ্রন্থাগার লেখকদের জন্য প্রয়োজন, যারা এই সৃষ্টির জন্য দায়ী।

একত্রীকরণ প্রক্রিয়া কারখানা

ওজনহীন এবং ওজনযুক্ত সমষ্টির জন্য দুটি বিমূর্ত বেস ফ্যাক্টরি ক্লাস রয়েছে। তাদের create পদ্ধতি মানের প্রকার স্বাক্ষর নেয় সমষ্টিগত করা এবং ফেরৎ tff.templates.AggregationProcess যেমন মূল্যবোধের অ্যাগ্রিগেশন জন্য।

দ্বারা নির্মিত প্রক্রিয়া tff.aggregators.UnweightedAggregationFactory (1) সার্ভার এ রাষ্ট্র এবং (2) নির্দিষ্ট ধরনের মান: দুটি ইনপুট আর্গুমেন্ট লাগে value_type

একটি উদাহরণ বাস্তবায়ন tff.aggregators.SumFactory

দ্বারা নির্মিত প্রক্রিয়া tff.aggregators.WeightedAggregationFactory (1) সার্ভার এ রাষ্ট্র, (2) নির্দিষ্ট ধরনের মান: তিন ইনপুট আর্গুমেন্ট লাগে value_type এবং (3) টাইপ ওজন weight_type যখন তার invoking যেমন কারখানা এর ব্যবহারকারী দ্বারা নির্দিষ্ট, create পদ্ধতি।

একটি উদাহরণ বাস্তবায়ন tff.aggregators.MeanFactory যা ভরযুক্ত গড় নির্ণয় করে।

ফ্যাক্টরি প্যাটার্ন হল কিভাবে আমরা উপরে উল্লিখিত প্রথম লক্ষ্য অর্জন করি; যে সমষ্টি একটি স্বাধীন বিল্ডিং ব্লক. উদাহরণ স্বরূপ, কোন মডেলের ভেরিয়েবলগুলি প্রশিক্ষণযোগ্য তা পরিবর্তন করার সময়, একটি জটিল সমষ্টিকে পরিবর্তন করার প্রয়োজন হয় না; যখন যেমন একটি পদ্ধতি দ্বারা ব্যবহৃত এটা প্রতিনিধিত্বমূলক কারখানা বিভিন্ন ধরনের স্বাক্ষর সহ প্রার্থনা করা হবে tff.learning.build_federated_averaging_process

রচনা

মনে রাখবেন যে একটি সাধারণ একত্রীকরণ প্রক্রিয়া (ক) ক্লায়েন্টে মানগুলির কিছু প্রিপ্রসেসিং, (খ) ক্লায়েন্ট থেকে সার্ভারে মানগুলির গতিবিধি এবং (গ) সার্ভারে সমষ্টিগত মানের কিছু পোস্টপ্রসেসিং করতে পারে। দ্বিতীয় সর্বোপরি, অ্যাগ্রিগেশন রচনা বিবৃত লক্ষ্য, ভিতরে নিরূপিত হয় tff.aggregators স্ট্রাকচারিং দ্বারা মডিউল অ্যাগ্রিগেশন কারখানা যেমন বাস্তবায়ন যে অংশ (খ) অন্য অ্যাগ্রিগেশন কারখানা অর্পণ করা যেতে পারে।

একটি একক ফ্যাক্টরি ক্লাসের মধ্যে সমস্ত প্রয়োজনীয় যুক্তি প্রয়োগ করার পরিবর্তে, বাস্তবায়নগুলি ডিফল্টভাবে একত্রিতকরণের জন্য প্রাসঙ্গিক একক দিকের উপর দৃষ্টি নিবদ্ধ করে। যখন প্রয়োজন হয়, তখন এই প্যাটার্নটি আমাদেরকে একবারে একটি বিল্ডিং ব্লক প্রতিস্থাপন করতে সক্ষম করে।

একটি উদাহরণ ভরযুক্ত হয় tff.aggregators.MeanFactory । এটির বাস্তবায়ন ক্লায়েন্টদের প্রদত্ত মান এবং ওজনকে গুণ করে, তারপর স্বাধীনভাবে ওজনযুক্ত মান এবং ওজন উভয়ই যোগ করে, এবং তারপরে সার্ভারে ওজনের যোগফল দ্বারা ওজনযুক্ত মানের যোগফলকে ভাগ করে। সরাসরি ব্যবহার করে summations বাস্তবায়নের পরিবর্তে tff.federated_sum অপারেটর, সমষ্টি দুই স্থানেই অর্পণ করা হয় tff.aggregators.SumFactory

এই ধরনের কাঠামো দুটি ডিফল্ট যোগফলকে বিভিন্ন কারখানা দ্বারা প্রতিস্থাপিত করা সম্ভব করে, যা যোগফলকে ভিন্নভাবে উপলব্ধি করে। উদাহরণস্বরূপ, একটি tff.aggregators.SecureSumFactory বা একটি কাস্টম বাস্তবায়ন tff.aggregators.UnweightedAggregationFactory । বিপরীতভাবে, সময়, tff.aggregators.MeanFactory নিজেই অন্য কারখানার ইনার অ্যাগ্রিগেশন যেমন হতে পারে tff.aggregators.clipping_factory , যদি মান গড় আগে ছাঁটা যেতে চলেছে।

পূর্ববর্তী দেখুন টিউনিং শেখার জন্যে aggregations সুপারিশ বিদ্যমান কারখানায় ব্যবহার রচনা প্রক্রিয়া receommended ব্যবহারের জন্য টিউটোরিয়াল tff.aggregators মডিউল।

উদাহরণ দ্বারা সেরা অভ্যাস

আমরা চিত্রিত করতে যাচ্ছি tff.aggregators একটি সহজ উদাহরণ টাস্ক প্রয়োগ করে বিস্তারিতভাবে ধারণা, এবং এটি ধীরে ধীরে আরও সাধারণ ভুলবেন না। শেখার আরেকটি উপায় হল বিদ্যমান কারখানাগুলির বাস্তবায়নের দিকে নজর দেওয়া।

import collections
import tensorflow as tf
import tensorflow_federated as tff

পরিবর্তে summing এর value , উদাহরণস্বরূপ কাজের যোগফল হয় value * 2.0 এবং তারপর দ্বারা সমষ্টি ভাগ 2.0 । অ্যাগ্রিগেশন ফলাফলের এইভাবে গাণিতিকভাবে সরাসরি summing সমতূল্য value , এবং তিনটি অংশ নিয়ে গঠিত হিসেবে ভাবা যেতে পারে: (1) ক্লায়েন্ট এ স্কেলিং (2) ক্লায়েন্ট (3) সার্ভার এ unscaling জুড়ে summing।

নকশা উপরে বর্ণিত পর যুক্তিবিজ্ঞান একটি উপশ্রেণী হিসেবে বাস্তবায়িত হবে tff.aggregators.UnweightedAggregationFactory , যা উপযুক্ত সৃষ্টি tff.templates.AggregationProcess যখন একটি প্রদত্ত value_type সমষ্টিগত করুন:

ন্যূনতম বাস্তবায়ন

উদাহরণ টাস্কের জন্য, প্রয়োজনীয় গণনাগুলি সর্বদা একই থাকে, তাই রাষ্ট্র ব্যবহার করার কোন প্রয়োজন নেই। এটা এইভাবে খালি, এবং প্রতিনিধিত্ব করা হয় tff.federated_value((), tff.SERVER) । পরিমাপ জন্য একই ঝুলিতে, এখন জন্য.

কাজের ন্যূনতম বাস্তবায়ন এইভাবে নিম্নরূপ:

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value((), tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      scaled_value = tff.federated_map(
          tff.tf_computation(lambda x: x * 2.0), value)
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(
          tff.tf_computation(lambda x: x / 2.0), summed_value)
      measurements = tff.federated_value((), tff.SERVER)
      return tff.templates.MeasuredProcessOutput(
          state=state, result=unscaled_value, measurements=measurements)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

সবকিছু প্রত্যাশিত হিসাবে কাজ করে কিনা তা নিম্নলিখিত কোড দিয়ে যাচাই করা যেতে পারে:

client_data = [1.0, 2.0, 5.0]
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print(f'Aggregation result: {output.result}  (expected 8.0)')
Type signatures of the created aggregation process:

  - initialize: ( -> <>@SERVER)
  - next: (<state=<>@SERVER,value={float32}@CLIENTS> -> <state=<>@SERVER,result=float32@SERVER,measurements=<>@SERVER>)

Aggregation result: 8.0  (expected 8.0)

স্টেটফুলনেস এবং পরিমাপ

স্টেটফুলনেস টিএফএফ-এ বিস্তৃতভাবে ব্যবহৃত হয় এমন গণনার প্রতিনিধিত্ব করার জন্য যা পুনরাবৃত্তিমূলকভাবে কার্যকর হবে এবং প্রতিটি পুনরাবৃত্তির সাথে পরিবর্তিত হবে। উদাহরণস্বরূপ, একটি শেখার গণনার অবস্থাতে শেখা মডেলের ওজন থাকে।

একটি সমষ্টি গণনাতে কীভাবে রাষ্ট্র ব্যবহার করতে হয় তা বোঝাতে, আমরা উদাহরণ টাস্কটি পরিবর্তন করি। পরিবর্তে গুন value দ্বারা 2.0 , আমরা সংখ্যাবৃদ্ধি এটা পুনরাবৃত্তির সূচক দ্বারা - যতবার অ্যাগ্রিগেশন মৃত্যুদন্ড কার্যকর করা হয়েছে।

এটি করার জন্য, আমাদের পুনরাবৃত্তি সূচকের ট্র্যাক রাখার একটি উপায় প্রয়োজন, যা রাষ্ট্রের ধারণার মাধ্যমে অর্জন করা হয়। ইন initialize_fn পরিবর্তে একটি খালি রাষ্ট্র তৈরি করার, আমরা রাষ্ট্র আরম্ভ স্কেলের শূন্য যাবে। এর পরে, রাষ্ট্র ব্যবহার করা যেতে পারে next_fn (1) দ্বারা বৃদ্ধি: তিনটি পদক্ষেপে 1.0 , (2) সংখ্যাবৃদ্ধি ব্যবহার value , এবং (3) নতুন আপডেটেড রাষ্ট্র হিসেবে প্রত্যাবর্তন।

একবার সম্পন্ন হলে, আপনি মনে রাখবেন করতে পারেন: কিন্তু ঠিক উপরে হিসাবে একই কোড আশানুরূপ কাজ যাচাই করার জন্য ব্যবহার করা যাবে। আমি কিভাবে জানি কিছু আসলে পরিবর্তিত হয়েছে?

ভাল প্রশ্ন! এখানেই পরিমাপের ধারণাটি কার্যকর হয়ে ওঠে। সাধারণভাবে, পরিমাপের একটি একক সঞ্চালনের প্রাসঙ্গিক কোনো মান প্রতিবেদন করতে পারেন next ফাংশন, যা পর্যবেক্ষণ জন্য ব্যবহার করা যেতে পারে। এই ক্ষেত্রে, এটা হতে পারে summed_value পূর্ববর্তী উদাহরণ থেকে। অর্থাৎ, "আনস্কেলিং" ধাপের আগের মান, যা পুনরাবৃত্তি সূচকের উপর নির্ভর করবে। আবার, এটি অগত্যা অনুশীলনে দরকারী নয়, তবে প্রাসঙ্গিক প্রক্রিয়াটি চিত্রিত করে।

টাস্কের রাষ্ট্রীয় উত্তরটি এইভাবে দেখায়:

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value(0.0, tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      new_state = tff.federated_map(
          tff.tf_computation(lambda x: x + 1.0), state)
      state_at_clients = tff.federated_broadcast(new_state)
      scaled_value = tff.federated_map(
          tff.tf_computation(lambda x, y: x * y), (value, state_at_clients))
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(
          tff.tf_computation(lambda x, y: x / y), (summed_value, new_state))
      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=summed_value)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

লক্ষ্য করুন state যে আসে next_fn ইনপুট হিসাবে সার্ভার স্থাপন করা হবে। অর্ডার ক্লায়েন্টদের এ ব্যবহার করার জন্য, এটি প্রথম, যোগাযোগ করা যা ব্যবহার অর্জিত হয় প্রয়োজন tff.federated_broadcast অপারেটর।

আশানুরূপ কাজ যাচাই করতে, আমরা এখন রিপোর্ট তাকান পারেন measurements , যা মৃত্যুদণ্ড প্রতিটি রাউন্ডে সঙ্গে বিভিন্ন হওয়া উচিত এমনকি যদি একই সঙ্গে রান client_data

client_data = [1.0, 2.0, 5.0]
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}   (expected 8.0 * 1)')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}  (expected 8.0 * 2)')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #3')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}  (expected 8.0 * 3)')
Type signatures of the created aggregation process:

  - initialize: ( -> float32@SERVER)
  - next: (<state=float32@SERVER,value={float32}@CLIENTS> -> <state=float32@SERVER,result=float32@SERVER,measurements=float32@SERVER>)

| Round #1
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 8.0   (expected 8.0 * 1)

| Round #2
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 16.0  (expected 8.0 * 2)

| Round #3
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 24.0  (expected 8.0 * 3)

কাঠামোগত প্রকার

ফেডারেটেড লার্নিংয়ে প্রশিক্ষিত একটি মডেলের মডেলের ওজন সাধারণত একটি একক টেনসরের পরিবর্তে টেনসরের সংগ্রহ হিসাবে উপস্থাপন করা হয়। TFF, এই হিসাবে প্রতিনিধিত্ব করা হয় tff.StructType এবং সাধারণত দরকারী অ্যাগ্রিগেশন কারখানা কাঠামোবদ্ধ ধরনের গ্রহণ করতে সক্ষম হতে হবে।

যাইহোক, উপরের উদাহরণগুলোতে, আমরা শুধুমাত্র একটি সঙ্গে কাজ tff.TensorType অবজেক্ট। আমরা সঙ্গে অ্যাগ্রিগেশন প্রক্রিয়া তৈরি করতে পূর্ববর্তী কারখানা ব্যবহার করতে চেষ্টা করুন tff.StructType([(tf.float32, (2,)), (tf.float32, (3,))]) , আমরা একটি অদ্ভুত ত্রুটি কারণ পেতে TensorFlow একটি গুন করতে চেষ্টা করবে tf.Tensor এবং একটি list

সমস্যা হল এর পরিবর্তে একটি ধ্রুবক দ্বারা tensors কাঠামো গুন, আমরা একটি ধ্রুবক দ্বারা গঠন প্রতিটি টেন্সর গুন প্রয়োজন। এই সমস্যার স্বাভাবিক সমাধান ব্যবহার করা tf.nest নির্মিত মডিউল ভিতরে tff.tf_computation গুলি।

পূর্ববর্তী সংস্করণ ExampleTaskFactory নিম্নরূপ কাঠামোবদ্ধ ধরনের সঙ্গে সামঞ্জস্যপূর্ণ এইভাবে দেখায়:

@tff.tf_computation()
def scale(value, factor):
  return tf.nest.map_structure(lambda x: x * factor, value)

@tff.tf_computation()
def unscale(value, factor):
  return tf.nest.map_structure(lambda x: x / factor, value)

@tff.tf_computation()
def add_one(value):
  return value + 1.0

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value(0.0, tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      new_state = tff.federated_map(add_one, state)
      state_at_clients = tff.federated_broadcast(new_state)
      scaled_value = tff.federated_map(scale, (value, state_at_clients))
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(unscale, (summed_value, new_state))
      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=summed_value)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

এই উদাহরণটি একটি প্যাটার্ন হাইলাইট করে যা টিএফএফ কোড গঠন করার সময় অনুসরণ করা কার্যকর হতে পারে। খুব সহজ অপারেশন সঙ্গে তার আচরণ না করেন, তখন কোড আরো সহজপাঠ্য হয়ে যখন tff.tf_computation গুলি করে একটি ভিতরে ব্লক নির্মাণের হিসেবে ব্যবহার করা হবে tff.federated_computation একটি পৃথক জায়গায় নির্মিত হয়। এর ভিতর tff.federated_computation , এই বিল্ডিং ব্লক শুধুমাত্র স্বকীয় অপারেটর ব্যবহার করে সংযুক্ত করা হয়।

এটি প্রত্যাশিত হিসাবে কাজ করে যাচাই করতে:

client_data = [[[1.0, 2.0], [3.0, 4.0, 5.0]],
               [[1.0, 1.0], [3.0, 0.0, -5.0]]]
factory = ExampleTaskFactory()
aggregation_process = factory.create(
    tff.to_type([(tf.float32, (2,)), (tf.float32, (3,))]))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print(f'Aggregation result: [{output.result[0]}, {output.result[1]}]\n'
      f'          Expected: [[2. 3.], [6. 4. 0.]]')
Type signatures of the created aggregation process:

  - initialize: ( -> float32@SERVER)
  - next: (<state=float32@SERVER,value={<float32[2],float32[3]>}@CLIENTS> -> <state=float32@SERVER,result=<float32[2],float32[3]>@SERVER,measurements=<float32[2],float32[3]>@SERVER>)

Aggregation result: [[2. 3.], [6. 4. 0.]]
          Expected: [[2. 3.], [6. 4. 0.]]

অভ্যন্তরীণ সমষ্টি

চূড়ান্ত পদক্ষেপ হল ঐচ্ছিকভাবে অন্যান্য কারখানায় প্রকৃত একত্রীকরণের অর্পণকে সক্ষম করা, যাতে বিভিন্ন একত্রীকরণ কৌশল সহজে তৈরি করা যায়।

এটি একটি ঐচ্ছিক তৈরি করে এটা করা যায় inner_factory আমাদের প্রস্ততকর্তার মধ্যে যুক্তি ExampleTaskFactory । যদি নির্দিষ্ট করা না, tff.aggregators.SumFactory ব্যবহার করা হয়, যা প্রযোজ্য tff.federated_sum পূর্বের বিভাগে সরাসরি ব্যবহার অপারেটর।

যখন create বলা হয়, আমরা প্রথমে কল করতে পারেন create এর inner_factory একই সঙ্গে ভেতরের অ্যাগ্রিগেশন প্রক্রিয়া তৈরি করতে value_type

আমাদের প্রক্রিয়া দ্বারা ফিরে রাজ্যের initialize_fn রাষ্ট্র দ্বারা "এই" প্রসেস সৃষ্টি এবং মাত্র নির্মিত ভেতরের প্রক্রিয়া হালচাল: দুটি অংশ একটি রচনা আছে।

বাস্তবায়ন next_fn যে প্রকৃত অ্যাগ্রিগেশন মধ্যে পৃথক অর্পণ করা হয় next ভেতরের প্রক্রিয়ার ফাংশন, এবং কিভাবে চূড়ান্ত আউটপুট গঠিত হয় না। রাষ্ট্র আবার "এই" এবং "ভেতরের" অবস্থায় গঠিত হয়, এবং পরিমাপ একটি হিসাবে একই উপায়ে গঠিত OrderedDict

নিম্নলিখিত এই ধরনের প্যাটার্ন একটি বাস্তবায়ন.

@tff.tf_computation()
def scale(value, factor):
  return tf.nest.map_structure(lambda x: x * factor, value)

@tff.tf_computation()
def unscale(value, factor):
  return tf.nest.map_structure(lambda x: x / factor, value)

@tff.tf_computation()
def add_one(value):
  return value + 1.0

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def __init__(self, inner_factory=None):
    if inner_factory is None:
      inner_factory = tff.aggregators.SumFactory()
    self._inner_factory = inner_factory

  def create(self, value_type):
    inner_process = self._inner_factory.create(value_type)

    @tff.federated_computation()
    def initialize_fn():
      my_state = tff.federated_value(0.0, tff.SERVER)
      inner_state = inner_process.initialize()
      return tff.federated_zip((my_state, inner_state))

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      my_state, inner_state = state
      my_new_state = tff.federated_map(add_one, my_state)
      my_state_at_clients = tff.federated_broadcast(my_new_state)
      scaled_value = tff.federated_map(scale, (value, my_state_at_clients))

      # Delegation to an inner factory, returning values placed at SERVER.
      inner_output = inner_process.next(inner_state, scaled_value)

      unscaled_value = tff.federated_map(unscale, (inner_output.result, my_new_state))

      new_state = tff.federated_zip((my_new_state, inner_output.state))
      measurements = tff.federated_zip(
          collections.OrderedDict(
              scaled_value=inner_output.result,
              example_task=inner_output.measurements))

      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=measurements)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

যখন থেকে প্রতিনিধিরূপে inner_process.next ফাংশন, রিটার্ন গঠন আমরা পেতে একটি হল tff.templates.MeasuredProcessOutput - একই তিনটি ক্ষেত্র সঙ্গে, state , result এবং measurements । যখন স্থিরীকৃত অ্যাগ্রিগেশন প্রক্রিয়ার সামগ্রিক আগমন গঠন তৈরি করার সময়, state এবং measurements ক্ষেত্র সাধারণত ক্ষান্ত এবং একসঙ্গে ফিরিয়ে দিতে হবে। এর বিপরীতে, result মান ক্ষেত্র অনুরূপ সমষ্টিগত হচ্ছে এবং ক্ষান্ত অ্যাগ্রিগেশন পরিবর্তে "দিয়ে প্রবাহিত"।

state বস্তু কারখানার একটি বাস্তবায়ন বিস্তারিত হিসাবে দেখা উচিত, এবং এইভাবে রচনা কোনো কাঠামো হতে পারে। যাইহোক, measurements মিলা মান কিছু সময়ে ব্যবহারকারীর কাছে রিপোর্ট করতে হবে। অতএব, আমরা ব্যবহার সুপারিশ OrderedDict ক্ষান্ত যেমন নামকরণ এটি যেখানে রচনা একটি মেট্রিক রিপোর্ট থেকে আসে না স্পষ্ট হবে সঙ্গে।

আরো উল্লেখ্য ব্যবহার tff.federated_zip অপারেটর। state বস্তু সৃষ্টি প্রক্রিয়া দ্বারা contolled একটি হওয়া উচিত tff.FederatedType । আমরা পরিবর্তে ফিরে এসেছে এমন (this_state, inner_state) মধ্যে initialize_fn , তার রিটার্ন টাইপ স্বাক্ষর একটি হবে tff.StructType একটি 2-tuple ধারণকারী tff.FederatedType গুলি। ব্যবহারের tff.federated_zip "ওপরও" tff.FederatedType শীর্ষ স্তরের জন্য। এই একভাবে ব্যবহার করা হয় next_fn যখন প্রস্তুতি রাষ্ট্রীয় ও পরিমাপ ফেরত পাঠানো হয়।

অবশেষে, আমরা দেখতে পারি এটি কীভাবে ডিফল্ট অভ্যন্তরীণ সমষ্টির সাথে ব্যবহার করা যেতে পারে:

client_data = [1.0, 2.0, 5.0]
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 8.0
| measurements['example_task']: ()

| Round #2
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 16.0
| measurements['example_task']: ()

... এবং একটি ভিন্ন অভ্যন্তরীণ সমষ্টি সঙ্গে. উদাহরণস্বরূপ, যদি একটি ExampleTaskFactory :

client_data = [1.0, 2.0, 5.0]
# Note the inner delegation can be to any UnweightedAggregaionFactory.
# In this case, each factory creates process that multiplies by the iteration
# index (1, 2, 3, ...), thus their combination multiplies by (1, 4, 9, ...).
factory = ExampleTaskFactory(ExampleTaskFactory())
aggregation_process = factory.create(tff.TensorType(tf.float32))
state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 8.0
| measurements['example_task']: OrderedDict([('scaled_value', 8.0), ('example_task', ())])

| Round #2
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 16.0
| measurements['example_task']: OrderedDict([('scaled_value', 32.0), ('example_task', ())])

সারসংক্ষেপ

এই টিউটোরিয়ালে, আমরা একটি সাধারণ-উদ্দেশ্য একত্রীকরণ বিল্ডিং ব্লক তৈরি করার জন্য অনুসরণ করার সর্বোত্তম অনুশীলনগুলি ব্যাখ্যা করেছি, যা একটি সমষ্টি কারখানা হিসাবে উপস্থাপন করা হয়। সাধারণতা দুটি উপায়ে নকশা অভিপ্রায় মাধ্যমে আসে:

  1. প্যারামিটারাইজড গণনা। সমষ্টি একটি স্বাধীন বিল্ডিং ব্লক যে অন্যান্য TFF সঙ্গে কাজ করার পরিকল্পনা মডিউল মধ্যে প্লাগ করা যেতে পারে tff.aggregators যেমন তাদের প্রয়োজনীয় অ্যাগ্রিগেশন parameterize চাই, tff.learning.build_federated_averaging_process
  2. একত্রীকরণ রচনা। আরও জটিল যৌগিক সমষ্টি তৈরি করতে একটি সমষ্টি বিল্ডিং ব্লক অন্যান্য একত্রীকরণ বিল্ডিং ব্লকের সাথে তৈরি করা যেতে পারে।