अपने स्थानीय TensorFlow के लिए RSVP आज हर जगह घटना!
इस पेज का अनुवाद Cloud Translation API से किया गया है.
Switch to English

पैरामीटर सर्वर प्रशिक्षण

TensorFlow.org पर देखें Google Colab में चलाएं GitHub पर स्रोत देखें नोटबुक डाउनलोड करें

अवलोकन

पैरामीटर सर्वर प्रशिक्षण एक सामान्य डेटा-समानांतर विधि है जो कई मशीनों पर मॉडल प्रशिक्षण को बढ़ाता है। एक पैरामीटर सर्वर ट्रेनिंग क्लस्टर में श्रमिक और पैरामीटर सर्वर होते हैं। चर पैरामीटर सर्वर पर बनाए जाते हैं और उन्हें प्रत्येक चरण में श्रमिकों द्वारा पढ़ा और अपडेट किया जाता है। डिफ़ॉल्ट रूप से, कार्यकर्ता एक दूसरे के साथ सिंक्रनाइज़ किए बिना स्वतंत्र रूप से इन चर को पढ़ते हैं और अपडेट करते हैं। यही कारण है कि कभी-कभी पैरामीटर सर्वर-शैली प्रशिक्षण को अतुल्यकालिक प्रशिक्षण कहा जाता है।

TensorFlow 2 पैरामीटर सर्वर प्रशिक्षण tf.distribute.experimental.coordinator.ClusterCoordinator वर्ग के माध्यम से एक केंद्रीय-समन्वयक का उपयोग करता है।

इस कार्यान्वयन में worker और parameter server कार्य tf.distribute.Server s चलाते हैं जो समन्वयक के अनुरोधों को सुनते हैं। समन्वयक संसाधन बनाता है, प्रशिक्षण कार्यों को भेजता है, चौकियों को लिखता है, और कार्य विफलताओं से निपटता है।

हमारा मानना ​​है कि यह वास्तुकला और नई ClusterCoordinator क्लास एक अधिक लचीला और सरल प्रोग्रामिंग मॉडल प्रदान करती है।

ClusterCoordinator

ClusterCoordinator वर्ग को tf.distribute.Strategy ऑब्जेक्ट के साथ संयोजन के रूप में काम करने की आवश्यकता है। यह tf.distribute.Strategy वस्तु क्लस्टर की जानकारी पारित करने के लिए की जरूरत है और जैसा कि हम में देखा है एक प्रशिक्षण कदम को परिभाषित करने के लिए किया जाता है के साथ कस्टम प्रशिक्षण MirroredStrategyClusterCoordinator ऑब्जेक्ट तब दूरस्थ श्रमिकों के लिए इन प्रशिक्षण चरणों के निष्पादन को भेजता है। वर्तमान में, ClusterCoordinator केवल tf.distribute.experimental.ParameterServerStrategy साथ काम करता है।

ClusterCoordinator ऑब्जेक्ट द्वारा प्रदान किया गया सबसे महत्वपूर्ण API scheduleschedule API एक tf.function और भविष्य की तरह RemoteValue तुरंत RemoteValue है। कतारबद्ध कार्यों को पृष्ठभूमि के धागे में दूरदराज के श्रमिकों को भेजा जाएगा और उनके RemoteValue को अतुल्यकालिक रूप से भरा जाएगा। चूंकि schedule को वर्कर असाइनमेंट की आवश्यकता नहीं होती है, tf.function पारित tf.function को किसी भी उपलब्ध वर्कर पर निष्पादित किया जा सकता है। यदि इसके पूर्ण होने से पहले इसे निष्पादित कर दिया जाता है, तो यह कार्य किसी अन्य उपलब्ध कर्मी पर वापस ले लिया जाएगा। इस तथ्य और इस तथ्य के कारण कि फ़ंक्शन निष्पादन परमाणु नहीं है, एक फ़ंक्शन को एक से अधिक बार निष्पादित किया जा सकता है।

दूरस्थ कार्यों को भेजने के अलावा, ClusterCoordinator सभी श्रमिकों पर डेटासेट बनाने और इन श्रमिकों को फिर से बनाने में मदद करता है जब एक कार्यकर्ता विफलता से ClusterCoordinator

ट्यूटोरियल सेटअप

pip install -q portpicker
import multiprocessing
import os
import random
import portpicker
import tensorflow as tf
import tensorflow.keras as keras
import tensorflow.keras.layers.experimental.preprocessing as kpl

क्लस्टर सेटअप

जैसा कि ऊपर उल्लेख किया गया है, एक पैरामीटर सर्वर प्रशिक्षण क्लस्टर को एक समन्वयक कार्य की आवश्यकता होती है जो आपके प्रशिक्षण कार्यक्रम, एक या कई श्रमिकों और पैरामीटर सर्वर कार्यों को चलाता है जो TensorFlow सर्वर, अर्थात tf.distribute.Server , और संभवतः एक अतिरिक्त मूल्यांकन कार्य चलाता है जो साइड-कार चलाता है। मूल्यांकन (नीचे साइड-कार मूल्यांकन अनुभाग देखें)। उन्हें स्थापित करने की आवश्यकताएं हैं:

  • समन्वयक कार्य को मूल्यांकनकर्ता को छोड़कर अन्य सभी TensorFlow सर्वरों के पते और बंदरगाहों को जानने की आवश्यकता है।
  • श्रमिकों और पैरामीटर सर्वर को यह जानना होगा कि उन्हें किस पोर्ट को सुनने की आवश्यकता है। सादगी के लिए, हम आम तौर पर पूर्ण क्लस्टर जानकारी में गुजरते हैं जब हम इन कार्यों पर TensorFlow सर्वर बनाते हैं।
  • मूल्यांकनकर्ता कार्य को प्रशिक्षण क्लस्टर के सेटअप को जानने की आवश्यकता नहीं है। यदि ऐसा होता है, तो उसे प्रशिक्षण समूह से जुड़ने का प्रयास नहीं करना चाहिए।
  • श्रमिकों और पैरामीटर सर्वरों को क्रमशः "कार्यकर्ता" और "पीएस" के रूप में कार्य प्रकार होना चाहिए। समन्वयक को "प्रमुख" का उपयोग विरासत के कारणों के लिए कार्य प्रकार के रूप में करना चाहिए।

इस ट्यूटोरियल में, हम इन-प्रोसेस क्लस्टर बनाएंगे ताकि कोलाब में पूरे पैरामीटर सर्वर ट्रेनिंग को चलाया जा सके। हम बाद के अनुभाग में असली क्लस्टर सेट करने का तरीका बताएंगे।

प्रक्रिया में क्लस्टर

इस ट्यूटोरियल में, हम पहले से TensorFlow सर्वर का एक गुच्छा शुरू करेंगे और बाद में उनसे जुड़ेंगे:

def create_in_process_cluster(num_workers, num_ps):
  """Creates and starts local servers and returns the cluster_resolver."""
  worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
  ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]

  cluster_dict = {}
  cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
  if num_ps > 0:
    cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]

  cluster_spec = tf.train.ClusterSpec(cluster_dict)

  # Workers need some inter_ops threads to work properly.
  worker_config = tf.compat.v1.ConfigProto()
  if multiprocessing.cpu_count() < num_workers + 1:
    worker_config.inter_op_parallelism_threads = num_workers + 1

  for i in range(num_workers):
    tf.distribute.Server(
        cluster_spec, job_name="worker", task_index=i, config=worker_config,
        protocol="grpc")

  for i in range(num_ps):
    tf.distribute.Server(
        cluster_spec, job_name="ps", task_index=i, protocol="grpc")

  cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
      cluster_spec, rpc_layer="grpc")
  return cluster_resolver

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)

कस्टम ट्रेनिंग लूप के साथ प्रशिक्षण

tf.distribute.Strategy साथ कस्टम प्रशिक्षण पाश प्रशिक्षण छोरों को परिभाषित करने के लिए महान लचीलापन प्रदान करता है। वर्तमान में TensorFlow 2 में पैरामीटर सर्वर प्रशिक्षण के लिए, केवल कस्टम प्रशिक्षण लूप समर्थित है। यहाँ हम एक प्रशिक्षण चरण को परिभाषित करने के लिए ParameterServerStrategy का उपयोग करते हैं और फिर दूरस्थ कर्मचारियों को प्रशिक्षण चरणों के निष्पादन को भेजने के लिए ClusterCoordinator का उपयोग करते हैं।

ParameterServerStrategy बनाएँ

कस्टम प्रशिक्षण पाश में एक प्रशिक्षण कदम लिखने के लिए, पहला कदम एक ParameterServerStrategy बनाना है। हम बाद में variable_partitioner में बताएंगे।

variable_partitioner = (
    tf.distribute.experimental.partitioners.FixedShardsPartitioner(
        num_shards=NUM_PS))

strategy = tf.distribute.experimental.ParameterServerStrategy(
    cluster_resolver,
    variable_partitioner=variable_partitioner)
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:localhost/replica:0/task:0/device:GPU:0'], variable_device = '/job:localhost/replica:0/task:0/device:GPU:0'
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'ps': ['localhost:20829', 'localhost:21744'], 'worker': ['localhost:15899', 'localhost:17275', 'localhost:17846']})
INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'ps': ['localhost:20829', 'localhost:21744'], 'worker': ['localhost:15899', 'localhost:17275', 'localhost:17846']})

फिर आप एक मॉडल बनाएंगे, एक डेटासेट और एक स्टेप फंक्शन को परिभाषित करेंगे जैसा कि हमने प्रशिक्षण पाश में अन्य tf.distribute.Strategy s के साथ देखा है। आप इस ट्यूटोरियल में अधिक जानकारी पा सकते हैं। आइए इन घटकों को निम्नलिखित चरणों में बनाएँ:

डेटा सेट करें

सबसे पहले, एक फ़ंक्शन लिखें जो कि एक डेटासेट बनाता है जिसमें केरस प्रीप्रोसेसिंग परतों द्वारा कार्यान्वित प्रीप्रोसेसिंग लॉजिक शामिल है। हम बाहर इन परतों का निर्माण करेगा dataset_fn लेकिन अंदर परिवर्तन लागू dataset_fn जब से तुम लपेटो जाएगा dataset_fn एक में tf.function जो चर इसके अंदर बनाने की अनुमति नहीं देता है।

feature_vocab = [
    "avenger", "ironman", "batman", "hulk", "spiderman", "kingkong",
    "wonder_woman"
]
label_vocab = ["yes", "no"]

with strategy.scope():
  feature_lookup_layer = kpl.StringLookup(vocabulary=feature_vocab)

  label_lookup_layer = kpl.StringLookup(vocabulary=label_vocab,
                                        num_oov_indices=0,
                                        mask_token=None)

  raw_feature_input = keras.layers.Input(
      shape=(3,), dtype=tf.string, name="feature")
  feature_id_input = feature_lookup_layer(raw_feature_input)
  feature_preprocess_stage = keras.Model(
      {"features": raw_feature_input}, feature_id_input)

  raw_label_input = keras.layers.Input(
      shape=(1,), dtype=tf.string, name="label")
  label_id_input = label_lookup_layer(raw_label_input)
  label_preprocess_stage = keras.Model({"label": raw_label_input}, label_id_input)

एक डाटासेट में खिलौना उदाहरण उत्पन्न करें:

def feature_and_label_gen(num_examples=200):
  examples = {"features": [], "label": []}
  for _ in range(num_examples):
    features = random.sample(feature_vocab, 3)
    label = ["yes"] if "avenger" in features else ["no"]
    examples["features"].append(features)
    examples["label"].append(label)
  return examples

examples = feature_and_label_gen()

फिर हम डेटासेट में लिपटे प्रशिक्षण डेटासेट बनाते हैं:

def dataset_fn(_):
  raw_dataset = tf.data.Dataset.from_tensor_slices(examples)

  train_dataset = raw_dataset.map(
      lambda x: (
          {"features": feature_preprocess_stage(x["features"])},
          label_preprocess_stage(x["label"])
      )).shuffle(200).batch(32).repeat()
  return train_dataset

मॉडल बनाएं

दूसरा, हम मॉडल और अन्य ऑब्जेक्ट बनाते हैं। strategy.scope तहत सभी चर बनाने के लिए सुनिश्चित करें।

# These variables created under the `strategy.scope` will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
  # Create the model. The input needs to be compatible with KPLs.
  model_input = keras.layers.Input(
      shape=(3,), dtype=tf.int64, name="model_input")

  emb_layer = keras.layers.Embedding(
      input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=20)
  emb_output = tf.reduce_mean(emb_layer(model_input), axis=1)
  dense_output = keras.layers.Dense(units=1, activation="sigmoid")(emb_output)
  model = keras.Model({"features": model_input}, dense_output)

  optimizer = keras.optimizers.RMSprop(learning_rate=0.1)
  accuracy = keras.metrics.Accuracy()

प्रशिक्षण कदम को परिभाषित करें

तीसरा, एक tf.function में लिपटा हुआ प्रशिक्षण चरण tf.function :

@tf.function
def step_fn(iterator):

  def replica_fn(batch_data, labels):
    with tf.GradientTape() as tape:
      pred = model(batch_data, training=True)
      per_example_loss = keras.losses.BinaryCrossentropy(
              reduction=tf.keras.losses.Reduction.NONE)(labels, pred)
      loss = tf.nn.compute_average_loss(per_example_loss)
      gradients = tape.gradient(loss, model.trainable_variables)

    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    accuracy.update_state(labels, actual_pred)
    return loss

  batch_data, labels = next(iterator)
  losses = strategy.run(replica_fn, args=(batch_data, labels))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)

उपरोक्त चरण फ़ंक्शन में, कॉलिंग strategy.run और strategy.reduce में step_fn भविष्य में GPU या एकाधिक प्रतिकृतियां कार्यकर्ता का समर्थन करने के लिए उपयोगी हैं, हालांकि उनके पास इस समय तुच्छ कार्यान्वयन है।

दूरस्थ श्रमिकों के लिए प्रशिक्षण कदमों का प्रेषण

ParameterServerStrategy द्वारा सभी ClusterCoordinator को परिभाषित किए जाने के बाद, हम संसाधन बनाने और दूरस्थ श्रमिकों को प्रशिक्षण चरण वितरित करने के लिए ClusterCoordinator वर्ग का उपयोग करेंगे।

आइए पहले एक ClusterCoordinator ऑब्जेक्ट बनाएं और रणनीति ऑब्जेक्ट में पास करें:

coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)

फिर हम एक प्रति-कार्यकर्ता डेटासेट और एक पुनरावृत्ति बनाते हैं। में per_worker_dataset_fn नीचे, लपेटकर dataset_fn में strategy.distribute_datasets_from_function वैकल्पिक है, लेकिन यह भविष्य में मूल GPUs के लिए कुशल प्रीफेचिंग समर्थन जब GPUs के द्वारा समर्थित हैं की अनुमति देगा ParameterServerStrategy

@tf.function
def per_worker_dataset_fn():
  return strategy.distribute_datasets_from_function(dataset_fn)

per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).

अंतिम चरण schedule का उपयोग करके दूरस्थ श्रमिकों को गणना वितरित करना है। schedule मेथड tf.function को tf.function और भविष्य की तरह RemoteValue तुरंत RemoteValue है। कतारबद्ध कार्यों को पृष्ठभूमि के धागे में दूरस्थ श्रमिकों के लिए भेजा जाएगा और RemoteValue को अतुल्यकालिक रूप से भरा जाएगा। join विधि का उपयोग तब तक किया जा सकता है जब तक कि सभी शेड्यूल्ड फ़ंक्शंस एक्सटेंड नहीं हो जाते।

num_epoches = 4
steps_per_epoch = 5
for i in range(num_epoches):
  accuracy.reset_states()
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  # Wait at epoch boundaries.
  coordinator.join()
  print ("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:1/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:1/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:1/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:1/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/job:ps/replica:0/task:0/device:CPU:0',).
Finished epoch 0, accuracy is 0.462500.
Finished epoch 1, accuracy is 0.925000.
Finished epoch 2, accuracy is 1.000000.
Finished epoch 3, accuracy is 1.000000.

यहां बताया गया है कि आप RemoteValue का परिणाम कैसे प्राप्त कर सकते हैं:

loss = coordinator.schedule(step_fn, args=(per_worker_iterator,))
print ("Final loss is %f" % loss.fetch())
Final loss is 0.015665

वैकल्पिक रूप से, आप सभी चरणों को लॉन्च कर सकते हैं और पूरा होने की प्रतीक्षा करते हुए कुछ कर सकते हैं:

for _ in range(total_steps):
  coordinator.schedule(step_fn, args=(per_worker_iterator,))
while not coordinator.done():
  time.sleep(10)
  # Do something like logging metrics or writing checkpoints.

इस विशेष उदाहरण के लिए पूर्ण प्रशिक्षण और सेवारत वर्कफ़्लो के लिए, कृपया इस परीक्षण को देखें

डाटासेट निर्माण के बारे में अधिक

उपरोक्त कोड में डाटासेट create_per_worker_dataset API का उपयोग करके बनाया गया है। यह प्रति कार्यकर्ता एक डेटासेट बनाता है और एक कंटेनर ऑब्जेक्ट लौटाता है। आप प्रति कार्यकर्ता पुनरावृत्ति बनाने के लिए उस पर iter विधि कह सकते हैं। प्रति-कार्यकर्ता इटरेटर में प्रति कार्यकर्ता एक इटरेटर होता है और एक वर्कर के संबंधित स्लाइस को किसी विशेष वर्कर पर कार्य निष्पादित होने से पहले schedule विधि को दिए गए फ़ंक्शन के इनपुट तर्क में प्रतिस्थापित किया जाएगा।

वर्तमान में schedule विधि मानती है कि कार्यकर्ता बराबर हैं और इस प्रकार विभिन्न कर्मचारियों पर डेटासेट समान हैं, सिवाय इसके कि वे अलग-अलग फेरबदल किए जा सकते हैं अगर उनमें डेटासेट ऑपरेशन हो। इस वजह से, हम डेटासेट्स को अनिश्चित काल तक दोहराए जाने की सलाह देते हैं और एक डेटासेट से OutOfRangeError पर भरोसा करने के बजाय कई चरणों को सीमित करते हैं।

एक और महत्वपूर्ण नोट यह है कि tf.data डेटासेट कार्य सीमा के दौरान निहित क्रमांकन और tf.data समर्थन नहीं करता है। इसलिए create_per_worker_dataset गए फ़ंक्शन के अंदर संपूर्ण डेटासेट बनाना महत्वपूर्ण है।

चर तेज

परिवर्तनीय शार्किंग एक चर को कई छोटे चर में विभाजित करने के लिए संदर्भित करता है। हम इन छोटे चर को sd कहते हैं। इन शार्क को एक्सेस करते समय नेटवर्क लोड वितरित करने के लिए वैरिएबल शार्किंग उपयोगी हो सकती है। यह कई पैरामीटर सर्वरों में एक सामान्य चर की गणना और भंडारण को वितरित करने के लिए भी उपयोगी है।

चर sharding सक्षम करने के लिए, आप एक में पारित कर सकते हैं variable_partitioner जब एक निर्माण ParameterServerStrategy वस्तु। variable_partitioner हर बार सक्रिय किया जाएगा जब एक चर बनाई गई है और यह चर के प्रत्येक आयाम के साथ टुकड़े की संख्या लौटने की उम्मीद है। कुछ बाहर के बॉक्स variable_partitioner रों रूप में इस तरह प्रदान की जाती हैं tf.distribute.experimental.partitioners.FixedShardsPartitioner

उपरोक्त उदाहरण में, हम FixedShardsPartitioner उपयोग करते हैं जो सभी चर को दो FixedShardsPartitioner विभाजित करेगा और प्रत्येक FixedShardsPartitioner को विभिन्न पैरामीटर सर्वरों को सौंपा जाएगा:

assert len(emb_layer.weights) == 2
assert emb_layer.weights[0].shape == (5, 20)
assert emb_layer.weights[1].shape == (4, 20)
assert emb_layer.weights[0].device == "/job:ps/replica:0/task:0/device:CPU:0"
assert emb_layer.weights[1].device == "/job:ps/replica:0/task:1/device:CPU:0"

जब एक variable_partitioner पास किया जाता है और यदि आप सीधे strategy.scope() तहत एक वैरिएबल बनाते हैं, तो यह एक variables प्रॉपर्टी के साथ एक कंटेनर प्रकार बन जाएगा जो कि शार्क की सूची तक पहुंच प्रदान करता है। ज्यादातर मामलों में, यह कंटेनर स्वचालित रूप से सभी शार्क को समतल करके एक Tensor में बदल जाएगा। नतीजतन, यह एक सामान्य चर के रूप में इस्तेमाल किया जा सकता है। दूसरी ओर, कुछ TensorFlow तरीके जैसे tf.nn.embedding_lookup इस कंटेनर प्रकार के लिए कुशल कार्यान्वयन प्रदान करते हैं और इन विधियों में स्वत: tf.nn.embedding_lookup से बचा जाएगा।

कृपया अधिक विवरण के लिए ParameterServerStrategy का एपीआई डॉकस्ट्रिंग देखें।

मूल्यांकन

वितरित प्रशिक्षण में मूल्यांकन लूप को परिभाषित करने और चलाने के लिए एक से अधिक तरीके हैं। नीचे वर्णित के रूप में प्रत्येक के पास अपने स्वयं के पेशेवरों और विपक्ष हैं। यदि आपके पास वरीयता नहीं है, तो इनलाइन मूल्यांकन विधि की सिफारिश की जाती है।

इनलाइन मूल्यांकन

इस पद्धति में समन्वयक प्रशिक्षण और मूल्यांकन के बीच वैकल्पिक होता है और इस प्रकार हम इसे इनलाइन मूल्यांकन कहते हैं। इनलाइन मूल्यांकन के कई लाभ हैं। उदाहरण के लिए, यह बड़े मूल्यांकन मॉडल और मूल्यांकन डेटासेट का समर्थन कर सकता है जो एक भी कार्य नहीं कर सकता है। एक अन्य उदाहरण के लिए, मूल्यांकन परिणाम का उपयोग अगले युग के प्रशिक्षण के लिए निर्णय लेने के लिए किया जा सकता है।

इनलाइन मूल्यांकन को लागू करने के दो तरीके हैं:

  • प्रत्यक्ष मूल्यांकन - छोटे मॉडल और मूल्यांकन डेटासेट के लिए समन्वयक समन्वयक पर मूल्यांकन डेटासेट के साथ सीधे वितरित मॉडल पर मूल्यांकन चला सकते हैं:
eval_dataset = tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).batch(8)

eval_accuracy = keras.metrics.Accuracy()
for batch_data, labels in eval_dataset:
  pred = model(batch_data, training=False)
  actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
  eval_accuracy.update_state(labels, actual_pred)

print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000

  • वितरित मूल्यांकन - बड़े मॉडल या डेटासेट के लिए जो समन्वयक पर सीधे चलाने के लिए अलग-अलग हैं, समन्वयक कार्य schedule / join विधियों के माध्यम से श्रमिकों को मूल्यांकन कार्य वितरित कर सकते हैं:
with strategy.scope():
  # Define the eval metric on parameter servers.
  eval_accuracy = keras.metrics.Accuracy()

@tf.function
def eval_step(iterator):
  def replica_fn(batch_data, labels):
    pred = model(batch_data, training=False)
    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    eval_accuracy.update_state(labels, actual_pred)
  batch_data, labels = next(iterator)
  strategy.run(replica_fn, args=(batch_data, labels))

def eval_dataset_fn():
  return tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).shuffle(16).repeat().batch(8)

per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn)
per_worker_eval_iterator = iter(per_worker_eval_dataset)

eval_steps_per_epoch = 2
for _ in range(eval_steps_per_epoch):
  coordinator.schedule(eval_step, args=(per_worker_eval_iterator,))
coordinator.join()
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000

साइड-कार मूल्यांकन

एक अन्य विधि को साइड-कार मूल्यांकन कहा जाता है जो एक समर्पित मूल्यांकनकर्ता कार्य बनाने के लिए है जो बार-बार चौकियों को पढ़ता है और एक नवीनतम चेकपॉइंट पर मूल्यांकन चलाता है। यह आपके प्रशिक्षण कार्यक्रम को जल्दी समाप्त करने की अनुमति देता है यदि आपको मूल्यांकन परिणामों के आधार पर अपने प्रशिक्षण पाश को बदलने की आवश्यकता नहीं है। हालाँकि, मूल्यांकन को ट्रिगर करने के लिए एक अतिरिक्त मूल्यांकनकर्ता कार्य और आवधिक जाँच की आवश्यकता होती है। निम्नलिखित एक संभावित साइड-कार मूल्यांकन लूप है:

checkpoint_dir = ...
eval_model = ...
eval_data = ...
checkpoint = tf.train.Checkpoint(model=eval_model)

for latest_checkpoint in tf.train.checkpoints_iterator(
    checkpoint_dir):
  try:
    checkpoint.restore(latest_checkpoint).expect_partial()
  except (tf.errors.OpError,) as e:
    # checkpoint may be deleted by training when it is about to read it.
    continue

  # Optionally add callbacks to write summaries.
  eval_model.evaluate(eval_data)

  # Evaluation finishes when it has evaluated the last epoch.
  if latest_checkpoint.endswith('-{}'.format(train_epoches)):
    break

वास्तविक दुनिया में क्लस्टर

एक वास्तविक उत्पादन वातावरण में, आप विभिन्न मशीनों पर विभिन्न प्रक्रियाओं में सभी कार्यों को चलाएंगे। प्रत्येक कार्य पर क्लस्टर जानकारी को कॉन्फ़िगर करने का सबसे सरल तरीका "TF_CONFIG" पर्यावरण चर सेट करना और "TF_CONFIG" को पार्स करने के लिए TFConfigClusterResolver का उपयोग TFConfigClusterResolver है। "TF_CONFIG" पर्यावरण चर के बारे में सामान्य विवरण के लिए, कृपया वितरित प्रशिक्षण मार्गदर्शिका देखें

यदि आप कुबेरनेट्स या अन्य कॉन्फ़िगरेशन टेम्प्लेट का उपयोग करके अपने प्रशिक्षण कार्यों को शुरू करते हैं, तो यह बहुत संभावना है कि ये टेम्पलेट आपके लिए "TF_CONFIG" पहले ही सेट कर चुके हैं।

"TF_CONFIG" पर्यावरण चर सेट करें

मान लीजिए कि आपके पास 3 कार्यकर्ता और 2 पैरामीटर सर्वर हैं, तो कार्यकर्ता का "TF_CONFIG" हो सकता है:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"],
        "chief": ["host6:port"]
    },
   "task": {"type": "worker", "index": 1}
})

मूल्यांकनकर्ता का "TF_CONFIG" हो सकता है:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "evaluator": ["host7:port"]
    },
   "task": {"type": "evaluator", "index": 0}
})
है

मूल्यांकनकर्ता के लिए उपरोक्त "TF_CONFIG" स्ट्रिंग में "क्लस्टर" भाग वैकल्पिक है।

यदि आप सभी कार्यों के लिए एक ही बाइनरी का उपयोग करते हैं

यदि आप एकल बाइनरी का उपयोग करके इन सभी कार्यों को चलाना पसंद करते हैं, तो आपको शुरुआत में ही अपनी प्रोग्राम शाखा को अलग-अलग भूमिकाओं में जाने देना होगा:

cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if cluster_resolver.task_type in ("worker", "ps"):
  # start a TensorFlow server and wait.
elif cluster_resolver.task_type == "evaluator":
  # run side-car evaluation
else:
  # run the coordinator.

निम्न कोड TensorFlow सर्वर शुरू करता है और इंतजार करता है:

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

cluster_resolver = tf.distribute.cluster_resolver.TF_ConfigClusterResolver()
server = tf.distribute.Server(
    cluster_resolver.cluster_spec(),
    job_name=cluster_resolver.task_type,
    task_index=cluster_resolver.task_id,
    protocol=cluster_resolver.rpc_layer or "grpc",
    start=True)
server.join()

कार्य विफलता को संभालना

कार्यकर्ता की विफलता

जैसा कि ऊपर उल्लेख किया गया है, ClusterCoordinator ने श्रमिक विफलता के लिए अंतर्निहित गलती सहिष्णुता है। वर्कर रिकवरी पर, create_per_worker_dataset द्वारा बनाए गए डेटासेट के संबंधित स्लाइस जो अभी भी स्कोप में हैं, उन्हें dataset_fn इसके मूल dataset_fn को create_per_worker_dataset करके फिर से बनाया जाएगा।

पैरामीटर सर्वर या समन्वयक विफलता

हालाँकि, जब समन्वयक एक पैरामीटर सर्वर त्रुटि को देखता है, तो यह तुरंत UnavailableError त्रुटि या AbortedError बढ़ा देगा। आप इस मामले में समन्वयक को पुनः आरंभ कर सकते हैं। समन्वयक स्वयं भी अनुपलब्ध हो सकता है। इसलिए, प्रशिक्षण प्रगति के बहुत कुछ नहीं खोने के लिए, प्रशिक्षण शुरू होने से पहले मॉडल चर को समय-समय पर जांचना और चेकपॉइंट से मॉडल चर को लोड करना महत्वपूर्ण है, यदि कोई हो। यदि एक ऑप्टिमाइज़र की जांच की जाती है, तो प्रशिक्षण प्रगति optimizer.iterations से लगभग अनुमानित हो सकती है।

checkpoint_manager = tf.train.CheckpointManager(
    tf.train.Checkpoint(model=model, optimizer=optimizer),
    checkpoint_dir,
    max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
  checkpoint = checkpoint_manager.checkpoint
  checkpoint.restore(
      checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()

global_steps = int(optimizer.iterations.numpy())
starting_epoch = global_steps // steps_per_epoch

for _ in range(starting_epoch, num_epoches):
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  coordinator.join()
  checkpoint_manager.save()

एक RemoteValue ला रहा है

यदि किसी फ़ंक्शन को सफलतापूर्वक निष्पादित किया जाता है, तो RemoteValue प्राप्त करना सफल होने की गारंटी है। ऐसा इसलिए है क्योंकि वर्तमान में किसी फ़ंक्शन के निष्पादित होने के बाद रिटर्न वैल्यू तुरंत समन्वयक को कॉपी कर दी जाती है। यदि प्रतिलिपि के दौरान कोई कार्यकर्ता विफलता है, तो फ़ंक्शन किसी अन्य उपलब्ध कार्यकर्ता पर वापस ले लिया जाएगा। इसलिए, यदि आप प्रदर्शन के लिए अनुकूलन करना चाहते हैं, तो आप रिटर्न मान के बिना फ़ंक्शन शेड्यूल कर सकते हैं।

त्रुटि की सूचना देना

एक बार समन्वयक को पैरामीटर सर्वर से UnavailableError त्रुटि या अन्य एप्लिकेशन त्रुटियों जैसे कि tf.debugging.check_numerics से एक InvalidArgument जैसे त्रुटि दिखाई tf.debugging.check_numerics , यह त्रुटि उठाने से पहले सभी लंबित और कतारबद्ध कार्यों को रद्द कर देगा। लाई जा रही है उनकी संगत RemoteValue एक बढ़ा देंगे CancelledError

एक त्रुटि उठाए जाने के बाद, समन्वयक उसी त्रुटि या रद्द किए गए कार्यों से कोई त्रुटि नहीं बढ़ाएगा।

प्रदर्शन में सुधार

यदि आप ParameterServerStrategy और ClusterResolver साथ प्रशिक्षण करते हैं तो प्रदर्शन के मुद्दों को देखने के कई संभावित कारण हैं।

एक सामान्य कारण पैरामीटर सर्वर में असंतुलित भार है और कुछ भारी-भरकम पैरामीटर वाले सर्वर क्षमता तक पहुँच चुके हैं। इसके कई मूल कारण भी हो सकते हैं। इस समस्या को कम करने के लिए कुछ सरल तरीके हैं

  1. एक निर्दिष्ट करने के माध्यम से अपने बड़े मॉडल चर ठीकरा variable_partitioner जब एक निर्माण ParameterServerStrategy
  2. हॉटस्पॉट वैरिएबल बनाने से बचें, जो संभव हो तो एक ही चरण में सभी पैरामीटर सर्वर द्वारा आवश्यक है। उदाहरण के लिए, एक सतत सीखने की दर या उपवर्ग tf.keras.optimizers.schedules.LearningRateSchedule । का उपयोग करें। ऑप्टिमाइज़र ऑप्टिमाइज़र में डिफ़ॉल्ट व्यवहार के बाद से tf.keras.optimizers.schedules.LearningRateSchedule है कि सीखने की दर एक विशेष पैरामीटर सर्वर पर रखा गया चर बन जाएगा और प्रत्येक चरण में अन्य सभी पैरामीटर सर्वरों के लिए अनुरोध किया जाएगा। ।
  3. Keras प्रीप्रोसेसिंग परतों में उन्हें पास करने से पहले अपनी बड़ी वोकैब्युलरीज़ को फेरबदल करें।

प्रदर्शन के मुद्दों का एक अन्य संभावित कारण समन्वयक है। schedule / join का हमारा पहला कार्यान्वयन पायथन-आधारित है और इस प्रकार ओवरहेडिंग हो सकता है। इसके अलावा समन्वयक और श्रमिकों के बीच विलंबता बड़ी हो सकती है। अगर ऐसा है, तो आप कई चरणों को एक एकल tf.function में पैक कर सकते हैं:

steps_per_invocation = 10
@tf.function
def step_fn(iterator):
  for _ in range(steps_per_invocation):
    features, labels = next(iterator)
    def replica_fn(features, labels):
      ...

    strategy.run(replica_fn, args=(features, labels))

हम समन्वयक का अनुकूलन करते रहेंगे और उम्मीद करते हैं कि अधिकांश उपयोगकर्ताओं को भविष्य में मैन्युअल रूप से कदम नहीं उठाने होंगे।

इसके अलावा, प्रदर्शन में सुधार के लिए एक छोटी सी चाल ऊपर दिए गए कार्य विफलता अनुभाग में बताए अनुसार रिटर्न वैल्यू के बिना फ़ंक्शन शेड्यूल करना है।

ज्ञात सीमाएँ

अधिकांश ज्ञात सीमाएं उपरोक्त वर्गों में शामिल हैं। यहाँ एक सारांश है:

  • os.environment["grpc_fail_fast"]="use_caller" की जरूरत है हर कार्य पर, समन्वयक सहित, दोष सहिष्णुता को ठीक से काम करने के लिए।
  • GPU कार्यकर्ता समर्थित नहीं हैं।
  • सिंक्रोनस पैरामीटर सर्वर प्रशिक्षण समर्थित नहीं है।
  • ParameterServerStrategy Keras compile और fit API के साथ काम नहीं करती है।
  • ClusterCoordinator.schedule किसी डेटासेट के लिए विज़िट की गारंटी का समर्थन नहीं करता है।
  • जब ClusterCoordinator.create_per_worker_dataset का उपयोग किया जाता है, तो पूरे डेटासेट को इसके पास दिए गए फ़ंक्शन के अंदर बनाया जाना चाहिए।
  • आमतौर पर इष्टतम प्रदर्शन प्राप्त करने के लिए एक ही फ़ंक्शन में कई चरणों को पैक करना आवश्यक है।
  • यह tf.saved_model.load के माध्यम से tf.saved_model.load गए_मॉडल को लोड करने के लिए समर्थित नहीं है जिसमें शार्प किए गए चर हैं। ध्यान दें कि इस तरह के save_model को TensorFlow सर्विंग के उपयोग से लोड करने की उम्मीद है।
  • यह एक चेकपॉइंट containg शार्प किए हुए ऑप्टिमाइज़र स्लॉट वैरिएबल को अलग-अलग संख्या में शार्क को लोड करने के लिए समर्थित नहीं है।
  • यह समन्वयक कार्य को पुनरारंभ किए बिना पैरामीटर सर्वर विफलता से उबरने के लिए समर्थित नहीं है।