Tff के ClientData के साथ कार्य करना।

TensorFlow.org पर देखें Google Colab में चलाएं GitHub पर स्रोत देखें नोटबुक डाउनलोड करें

क्लाइंट (जैसे उपयोगकर्ता) द्वारा कुंजीबद्ध डेटासेट की धारणा फ़ेडरेटेड गणना के लिए आवश्यक है जैसा कि TFF में मॉडल किया गया है। TFF इंटरफेस प्रदान करता है tff.simulation.datasets.ClientData इस अवधारणा से अधिक सार करने के लिए, और डेटासेट जो TFF मेजबान ( stackoverflow , शेक्सपियर , emnist , cifar100 , और gldv2 ) सभी इस इंटरफ़ेस को लागू।

आप अपने खुद के डाटासेट साथ फ़ेडरेटेड सीखने पर काम कर रहे हैं, तो TFF दृढ़ता से आप या तो लागू करने के लिए प्रोत्साहित करती है ClientData एक उत्पन्न करने के लिए TFF सहायक कार्यों का इंटरफ़ेस या उपयोग एक ClientData जो डिस्क पर आपके डेटा का प्रतिनिधित्व करता है, जैसे tff.simulation.datasets.ClientData.from_clients_and_fn .

TFF के अंत से अंत उदाहरण के अधिकांश के साथ शुरू ClientData वस्तुओं, को लागू करने ClientData अपने कस्टम डाटासेट के साथ इंटरफेस यह TFF के साथ लिखा मौजूदा कोड के माध्यम से spelunk को आसान कर देगा। इसके अलावा, tf.data.Datasets जो ClientData निर्माणों के ढांचे उपज के लिए सीधे से अधिक दोहराया जा सकता है numpy सरणियों, तो ClientData वस्तुओं TFF में जाने से पहले किसी भी अजगर आधारित एमएल ढांचे के साथ इस्तेमाल किया जा सकता।

ऐसे कई पैटर्न हैं जिनके साथ आप अपने जीवन को आसान बना सकते हैं यदि आप अपने सिमुलेशन को कई मशीनों तक बढ़ाने या उन्हें तैनात करने का इरादा रखते हैं। नीचे हम कई तरीकों में उपयोग कर सकते हैं के कुछ ही बताएगा कि ClientData और TFF हमारे छोटे पैमाने पर यात्रा करने के लिए बड़े पैमाने पर उत्पादन प्रयोग करने तैनाती अनुभव बनाने के लिए के रूप में संभव के रूप में चिकनी।

क्लाइंटडेटा को टीएफएफ में पास करने के लिए मुझे किस पैटर्न का उपयोग करना चाहिए?

हम TFF के दो प्रयोगों पर चर्चा करेंगे ClientData गहराई में; यदि आप नीचे दी गई दो श्रेणियों में से किसी एक में फिट होते हैं, तो आप स्पष्ट रूप से एक को दूसरे पर पसंद करेंगे। यदि नहीं, तो आपको अधिक सूक्ष्म विकल्प बनाने के लिए प्रत्येक के पेशेवरों और विपक्षों की अधिक विस्तृत समझ की आवश्यकता हो सकती है।

  • मैं स्थानीय मशीन पर जितनी जल्दी हो सके पुनरावृति करना चाहता हूं; मुझे टीएफएफ के वितरित रनटाइम का आसानी से लाभ उठाने में सक्षम होने की आवश्यकता नहीं है।

    • आप पास करना चाहते हैं tf.data.Datasets सीधे TFF करने के लिए।
    • यह आपको लाजि़मी कार्यक्रम के साथ की अनुमति देता है tf.data.Dataset वस्तुओं, और उन्हें मनमाने ढंग से संसाधित करते हैं।
    • यह नीचे दिए गए विकल्प की तुलना में अधिक लचीलापन प्रदान करता है; क्लाइंट्स को लॉजिक पुश करने के लिए आवश्यक है कि यह लॉजिक सीरियल करने योग्य हो।
  • मैं TFF के दूरस्थ रनटाइम में अपना फ़ेडरेटेड कंप्यूटेशन चलाना चाहता/चाहती हूँ, या मैं इसे शीघ्र ही करने की योजना बना रहा हूँ।

    • इस मामले में आप क्लाइंट के लिए डेटासेट निर्माण और प्रीप्रोसेसिंग को मैप करना चाहते हैं।
    • आप में यह परिणाम बस की एक सूची गुजर client_ids सीधे अपने फ़ेडरेटेड गणना करने के लिए।
    • ग्राहकों को डेटासेट निर्माण और प्रीप्रोसेसिंग को आगे बढ़ाने से क्रमांकन में आने वाली बाधाओं से बचा जाता है, और सैकड़ों-से-हजारों ग्राहकों के साथ प्रदर्शन में उल्लेखनीय वृद्धि होती है।

ओपन-सोर्स वातावरण सेट करें

पैकेज आयात करें

क्लाइंटडेटा ऑब्जेक्ट में हेरफेर करना

के लदान और TFF के EMNIST की खोज से शुरू करते हैं ClientData :

client_data, _ = tff.simulation.datasets.emnist.load_data()
Downloading emnist_all.sqlite.lzma: 100%|██████████| 170507172/170507172 [00:19<00:00, 8831921.67it/s]
2021-10-01 11:17:58.718735: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected

पहले डाटासेट निरीक्षण हमें बता सकते हैं उदाहरण के लिए किस प्रकार में हैं ClientData

first_client_id = client_data.client_ids[0]
first_client_dataset = client_data.create_tf_dataset_for_client(
    first_client_id)
print(first_client_dataset.element_spec)
# This information is also available as a `ClientData` property:
assert client_data.element_type_structure == first_client_dataset.element_spec
OrderedDict([('label', TensorSpec(shape=(), dtype=tf.int32, name=None)), ('pixels', TensorSpec(shape=(28, 28), dtype=tf.float32, name=None))])

ध्यान दें कि डाटासेट पैदावार collections.OrderedDict वस्तुओं है कि pixels और label चाबियाँ, जहां पिक्सल आकार के साथ एक टेन्सर है [28, 28] । मान लीजिए कि हमारे आकार के लिए बाहर हमारे आदानों समतल करना चाहते हैं [784] । एक संभव जिस तरह से हम यह कर सकते हैं हमारे लिए एक पूर्व प्रसंस्करण समारोह लागू करने के लिए किया जाएगा ClientData वस्तु।

def preprocess_dataset(dataset):
  """Create batches of 5 examples, and limit to 3 batches."""

  def map_fn(input):
    return collections.OrderedDict(
        x=tf.reshape(input['pixels'], shape=(-1, 784)),
        y=tf.cast(tf.reshape(input['label'], shape=(-1, 1)), tf.int64),
    )

  return dataset.batch(5).map(
      map_fn, num_parallel_calls=tf.data.experimental.AUTOTUNE).take(5)


preprocessed_client_data = client_data.preprocess(preprocess_dataset)

# Notice that we have both reshaped and renamed the elements of the ordered dict.
first_client_dataset = preprocessed_client_data.create_tf_dataset_for_client(
    first_client_id)
print(first_client_dataset.element_spec)
OrderedDict([('x', TensorSpec(shape=(None, 784), dtype=tf.float32, name=None)), ('y', TensorSpec(shape=(None, 1), dtype=tf.int64, name=None))])

हम कुछ और जटिल (और संभवतः स्टेटफुल) प्रीप्रोसेसिंग करने के अलावा चाहते हैं, उदाहरण के लिए फेरबदल।

def preprocess_and_shuffle(dataset):
  """Applies `preprocess_dataset` above and shuffles the result."""
  preprocessed = preprocess_dataset(dataset)
  return preprocessed.shuffle(buffer_size=5)

preprocessed_and_shuffled = client_data.preprocess(preprocess_and_shuffle)

# The type signature will remain the same, but the batches will be shuffled.
first_client_dataset = preprocessed_and_shuffled.create_tf_dataset_for_client(
    first_client_id)
print(first_client_dataset.element_spec)
OrderedDict([('x', TensorSpec(shape=(None, 784), dtype=tf.float32, name=None)), ('y', TensorSpec(shape=(None, 1), dtype=tf.int64, name=None))])

एक साथ इंटरफ़ेस tff.Computation

अब जब हम साथ में कुछ बुनियादी जोड़तोड़ प्रदर्शन कर सकते हैं ClientData वस्तुओं, हम एक के लिए फ़ीड डेटा के लिए तैयार हैं tff.Computation । हम एक परिभाषित tff.templates.IterativeProcess जो लागू करता संघीय औसत का , और यह डेटा गुजर के विभिन्न तरीकों का पता लगाएं।

def model_fn():
  model = tf.keras.models.Sequential([
      tf.keras.layers.InputLayer(input_shape=(784,)),
      tf.keras.layers.Dense(10, kernel_initializer='zeros'),
  ])
  return tff.learning.from_keras_model(
      model,
      # Note: input spec is the _batched_ shape, and includes the 
      # label tensor which will be passed to the loss function. This model is
      # therefore configured to accept data _after_ it has been preprocessed.
      input_spec=collections.OrderedDict(
          x=tf.TensorSpec(shape=[None, 784], dtype=tf.float32),
          y=tf.TensorSpec(shape=[None, 1], dtype=tf.int64)),
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])

trainer = tff.learning.build_federated_averaging_process(
    model_fn,
    client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.01))

इससे पहले कि हम इस के साथ काम शुरू IterativeProcess , के शब्दों पर एक टिप्पणी ClientData क्रम में है। एक ClientData वस्तु आबादी फ़ेडरेटेड प्रशिक्षण, जो सामान्य है के लिए उपलब्ध की सम्पूर्णता का प्रतिनिधित्व करता है एक उत्पादन FL प्रणाली के निष्पादन के पर्यावरण के लिए उपलब्ध नहीं है और अनुकरण के लिए विशिष्ट है। ClientData वास्तव में उपयोगकर्ता बाईपास फ़ेडरेटेड कंप्यूटिंग के लिए क्षमता पूरी तरह से देता है और बस के माध्यम से हमेशा की तरह एक सर्वर साइड मॉडल को प्रशिक्षित ClientData.create_tf_dataset_from_all_clients

TFF का सिमुलेशन वातावरण शोधकर्ता को बाहरी लूप के पूर्ण नियंत्रण में रखता है। विशेष रूप से इसका तात्पर्य क्लाइंट की उपलब्धता, क्लाइंट ड्रॉपआउट आदि के बारे में है, जिसे उपयोगकर्ता या पायथन ड्राइवर स्क्रिप्ट द्वारा संबोधित किया जाना चाहिए। एक सकता है अपने से अधिक नमूने वितरण का समायोजन करके उदाहरण मॉडल ग्राहक छोड़ने वालों के लिए ClientData's client_ids ऐसा है कि और अधिक डेटा के साथ उपयोगकर्ताओं को (और तदनुसार स्थानीय संगणना अब चलने वाली) कम संभावना के साथ चुना जाएगा।

एक वास्तविक फ़ेडरेटेड सिस्टम में, हालांकि, मॉडल ट्रेनर द्वारा क्लाइंट्स को स्पष्ट रूप से नहीं चुना जा सकता है; क्लाइंट का चयन उस सिस्टम को सौंपा जाता है जो फ़ेडरेटेड गणना को क्रियान्वित कर रहा है।

पासिंग tf.data.Datasets TFF के लिए सीधे

एक विकल्प है कि हम एक के बीच इंटरफ़ेस के लिए है ClientData और एक IterativeProcess कि निर्माण की है tf.data.Datasets अजगर में, और TFF करने के लिए इन डेटासेट गुजर।

सूचना है कि अगर हम अपने preprocessed का उपयोग ClientData डेटासेट हम उपज उचित प्रकार हमारे मॉडल ऊपर परिभाषित द्वारा की उम्मीद की है।

selected_client_ids = preprocessed_and_shuffled.client_ids[:10]

preprocessed_data_for_clients = [
    preprocessed_and_shuffled.create_tf_dataset_for_client(
        selected_client_ids[i]) for i in range(10)
]

state = trainer.initialize()
for _ in range(5):
  t1 = time.time()
  state, metrics = trainer.next(state, preprocessed_data_for_clients)
  t2 = time.time()
  print('loss {}, round time {}'.format(metrics['train']['loss'], t2 - t1))
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_federated/python/core/impl/compiler/tensorflow_computation_transformations.py:62: extract_sub_graph (from tensorflow.python.framework.graph_util_impl) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.compat.v1.graph_util.extract_sub_graph`
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_federated/python/core/impl/compiler/tensorflow_computation_transformations.py:62: extract_sub_graph (from tensorflow.python.framework.graph_util_impl) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.compat.v1.graph_util.extract_sub_graph`
loss 2.9005744457244873, round time 4.576513767242432
loss 3.113278388977051, round time 0.49641919136047363
loss 2.7581865787506104, round time 0.4904160499572754
loss 2.87259578704834, round time 0.48976993560791016
loss 3.1202380657196045, round time 0.6724586486816406

हम यह रास्ता अपनाते हैं, तथापि, हम तुच्छता बहुयंत्र अनुकरण करने के लिए ले जाने के लिए नहीं कर सकेंगे। डेटासेट हम स्थानीय TensorFlow क्रम में निर्माण आसपास के अजगर वातावरण से राज्य पर कब्जा, और क्रमबद्धता या अक्रमांकन में असफल हो, जब वे संदर्भ राज्य उन्हें अब उपलब्ध नहीं है जो करने का प्रयास कर सकते हैं। यह TensorFlow से गूढ़ त्रुटि में उदाहरण के लिए प्रकट कर सकते हैं tensor_util.cc :

Check failed: DT_VARIANT == input.dtype() (21 vs. 20)

ग्राहकों पर निर्माण और प्रीप्रोसेसिंग का मानचित्रण

इस समस्या से बचने के लिए, TFF अपने उपयोगकर्ताओं को डाटासेट इन्स्टेन्शियशन और कुछ है कि प्रत्येक ग्राहक पर स्थानीय रूप से होता है के रूप में preprocessing विचार करने के लिए, और TFF के सहायकों का उपयोग करें या करने के लिए सिफारिश की गई है federated_map स्पष्ट रूप से प्रत्येक ग्राहक पर इस preprocessing कोड को चलाने के लिए।

संकल्पनात्मक रूप से, इसे पसंद करने का कारण स्पष्ट है: टीएफएफ के स्थानीय रनटाइम में, क्लाइंट केवल "गलती से" वैश्विक पायथन पर्यावरण तक पहुंच पाते हैं क्योंकि इस तथ्य के कारण कि संपूर्ण फ़ेडरेटेड ऑर्केस्ट्रेशन एक मशीन पर हो रहा है। इस बिंदु पर यह ध्यान देने योग्य है कि समान सोच TFF के क्रॉस-प्लेटफ़ॉर्म, हमेशा-क्रमिक, कार्यात्मक दर्शन को जन्म देती है।

TFF के माध्यम से इस तरह के एक परिवर्तन सरल बना देता है ClientData's विशेषता dataset_computation , एक tff.Computation जो एक लेता है client_id और संबद्ध रिटर्न tf.data.Dataset

ध्यान दें कि preprocess बस के साथ काम करता dataset_computation ; dataset_computation preprocessed की विशेषता ClientData को शामिल किया गया संपूर्ण पूर्व प्रसंस्करण पाइपलाइन हम सिर्फ परिभाषित:

print('dataset computation without preprocessing:')
print(client_data.dataset_computation.type_signature)
print('\n')
print('dataset computation with preprocessing:')
print(preprocessed_and_shuffled.dataset_computation.type_signature)
dataset computation without preprocessing:
(string -> <label=int32,pixels=float32[28,28]>*)


dataset computation with preprocessing:
(string -> <x=float32[?,784],y=int64[?,1]>*)

हम आह्वान सकता dataset_computation और अजगर क्रम में एक उत्सुक डाटासेट प्राप्त करते हैं, लेकिन इस दृष्टिकोण की असली शक्ति का प्रयोग किया जाता है जब हम एक सतत प्रक्रिया है या किसी अन्य गणना सब पर वैश्विक उत्सुक क्रम में इन डेटासेट materializing से बचने के लिए के साथ लिखें। TFF एक सहायक समारोह प्रदान करता है tff.simulation.compose_dataset_computation_with_iterative_process जो वास्तव में यह करने के लिए इस्तेमाल किया जा सकता।

trainer_accepting_ids = tff.simulation.compose_dataset_computation_with_iterative_process(
    preprocessed_and_shuffled.dataset_computation, trainer)

दोनों इस tff.templates.IterativeProcesses और उसी तरह से चलाने के ऊपर एक; लेकिन पूर्व preprocessed ग्राहक डेटासेट स्वीकार करता है, और बाद ग्राहक आईडी का प्रतिनिधित्व तार, दोनों डाटासेट निर्माण से निपटने और उसके शरीर में preprocessing स्वीकार करता है - वास्तव में state दोनों के बीच पारित किया जा सकता।

for _ in range(5):
  t1 = time.time()
  state, metrics = trainer_accepting_ids.next(state, selected_client_ids)
  t2 = time.time()
  print('loss {}, round time {}'.format(metrics['train']['loss'], t2 - t1))
loss 2.8417396545410156, round time 1.6707067489624023
loss 2.7670371532440186, round time 0.5207102298736572
loss 2.665048122406006, round time 0.5302855968475342
loss 2.7213189601898193, round time 0.5313887596130371
loss 2.580148935317993, round time 0.5283482074737549

बड़ी संख्या में ग्राहकों के लिए स्केलिंग

trainer_accepting_ids तुरंत TFF के बहुयंत्र क्रम में इस्तेमाल किया जा सकता है, और टाल materializing tf.data.Datasets और नियंत्रक (और इसलिए उन्हें serializing और श्रमिकों के लिए उन्हें बाहर भेज रहे हैं)।

यह वितरित सिमुलेशन को महत्वपूर्ण रूप से गति देता है, विशेष रूप से बड़ी संख्या में ग्राहकों के साथ, और समान क्रमबद्धता/deserialization ओवरहेड से बचने के लिए मध्यवर्ती एकत्रीकरण को सक्षम बनाता है।

वैकल्पिक डीपडाइव: TFF में मैन्युअल रूप से प्रीप्रोसेसिंग लॉजिक बनाना

TFF को जमीन से ऊपर तक संरचना के लिए डिज़ाइन किया गया है; TFF के सहायक द्वारा अभी-अभी की गई रचना का प्रकार पूरी तरह से उपयोगकर्ताओं के रूप में हमारे नियंत्रण में है। हम स्वयं हो सकता था preprocessing गणना हम सिर्फ साथ परिभाषित रचना ट्रेनर के अपने next काफी बस:

selected_clients_type = tff.FederatedType(preprocessed_and_shuffled.dataset_computation.type_signature.parameter, tff.CLIENTS)

@tff.federated_computation(trainer.next.type_signature.parameter[0], selected_clients_type)
def new_next(server_state, selected_clients):
  preprocessed_data = tff.federated_map(preprocessed_and_shuffled.dataset_computation, selected_clients)
  return trainer.next(server_state, preprocessed_data)

manual_trainer_with_preprocessing = tff.templates.IterativeProcess(initialize_fn=trainer.initialize, next_fn=new_next)

वास्तव में, यह प्रभावी रूप से हमारे द्वारा उपयोग किया जाने वाला सहायक हुड के तहत कर रहा है (साथ ही उचित प्रकार की जांच और हेरफेर कर रहा है)। हम भी serializing से, कुछ अलग ढंग से एक ही तर्क व्यक्त किया जा सकता था preprocess_and_shuffle एक में tff.Computation , और सड़ते हुए federated_map एक कदम जो अन-preprocessed डेटासेट और एक अन्य जो रन बनाने का निर्माण में preprocess_and_shuffle प्रत्येक ग्राहक पर।

हम सत्यापित कर सकते हैं कि यह अधिक-मैनुअल पथ TFF के सहायक (मॉड्यूलो पैरामीटर नाम) के समान प्रकार के हस्ताक्षर के साथ गणना में परिणाम देता है:

print(trainer_accepting_ids.next.type_signature)
print(manual_trainer_with_preprocessing.next.type_signature)
(<server_state=<model=<trainable=<float32[784,10],float32[10]>,non_trainable=<>>,optimizer_state=<int64>,delta_aggregate_state=<value_sum_process=<>,weight_sum_process=<>>,model_broadcast_state=<>>@SERVER,federated_dataset={string}@CLIENTS> -> <<model=<trainable=<float32[784,10],float32[10]>,non_trainable=<>>,optimizer_state=<int64>,delta_aggregate_state=<value_sum_process=<>,weight_sum_process=<>>,model_broadcast_state=<>>@SERVER,<broadcast=<>,aggregation=<mean_value=<>,mean_weight=<>>,train=<sparse_categorical_accuracy=float32,loss=float32>,stat=<num_examples=int64>>@SERVER>)
(<server_state=<model=<trainable=<float32[784,10],float32[10]>,non_trainable=<>>,optimizer_state=<int64>,delta_aggregate_state=<value_sum_process=<>,weight_sum_process=<>>,model_broadcast_state=<>>@SERVER,selected_clients={string}@CLIENTS> -> <<model=<trainable=<float32[784,10],float32[10]>,non_trainable=<>>,optimizer_state=<int64>,delta_aggregate_state=<value_sum_process=<>,weight_sum_process=<>>,model_broadcast_state=<>>@SERVER,<broadcast=<>,aggregation=<mean_value=<>,mean_weight=<>>,train=<sparse_categorical_accuracy=float32,loss=float32>,stat=<num_examples=int64>>@SERVER>)