ParameterServerStrategy के साथ पैरामीटर सर्वर प्रशिक्षण

TensorFlow.org पर देखें Google Colab में चलाएं GitHub पर स्रोत देखें नोटबुक डाउनलोड करें

अवलोकन

पैरामीटर सर्वर प्रशिक्षण एक आम डेटा समानांतर कई मशीनों पर मॉडल प्रशिक्षण अप पैमाने पर करने की विधि है।

एक पैरामीटर सर्वर प्रशिक्षण क्लस्टर कार्यकर्ताओं और पैरामीटर सर्वर के होते हैं। चर पैरामीटर सर्वर पर बनाए जाते हैं और उन्हें प्रत्येक चरण में श्रमिकों द्वारा पढ़ा और अद्यतन किया जाता है। डिफ़ॉल्ट रूप से, कार्यकर्ता इन चरों को एक दूसरे के साथ सिंक्रनाइज़ किए बिना स्वतंत्र रूप से पढ़ते और अद्यतन करते हैं। यही कारण है कि कभी कभी सर्वर-शैली के प्रशिक्षण पैरामीटर अतुल्यकालिक प्रशिक्षण कहा जाता है।

TensorFlow 2 में, पैरामीटर सर्वर प्रशिक्षण के द्वारा संचालित है tf.distribute.experimental.ParameterServerStrategy वर्ग है, जो एक क्लस्टर कि श्रमिकों के हजारों करने के लिए स्केल को प्रशिक्षण चरणों वितरित करता है (पैरामीटर सर्वर के साथ)।

समर्थित प्रशिक्षण विधियां

दो मुख्य समर्थित प्रशिक्षण विधियां हैं:

नौकरियों और कार्यों के साथ एक समूह

पसंद के एपीआई (की परवाह किए बिना Model.fit या एक कस्टम प्रशिक्षण पाश), TensorFlow 2 में वितरित प्रशिक्षण शामिल है: एक 'cluster' कई के साथ 'jobs' , और नौकरियों में से हर एक या एक से अधिक हो सकता है 'tasks'

पैरामीटर सर्वर प्रशिक्षण का उपयोग करते समय, इसकी अनुशंसा की जाती है:

  • एक समन्वयक काम (जो कार्य का नाम है chief )
  • एकाधिक कर्मी के पास कार्य (काम नाम worker ); तथा
  • एकाधिक पैरामीटर सर्वर नौकरियों (काम नाम ps )

समन्वयक संसाधन, कार्य प्रशिक्षण डिस्पैच बनाता है, वहीं चौकियों लिखते हैं, और कार्य विफलताओं, श्रमिकों और पैरामीटर सर्वर के साथ सौदों चलाने tf.distribute.Server कि समन्वयक के अनुरोधों के लिए सुन।

साथ पैरामीटर सर्वर प्रशिक्षण Model.fit एपीआई

साथ पैरामीटर सर्वर प्रशिक्षण Model.fit एपीआई समन्वयक एक का उपयोग करने की आवश्यकता है tf.distribute.experimental.ParameterServerStrategy वस्तु, और एक tf.keras.utils.experimental.DatasetCreator इनपुट के रूप में। करने के लिए इसी तरह के Model.fit कोई रणनीति के साथ, या अन्य रणनीतियों के साथ, कार्यप्रवाह बनाने शामिल है और मॉडल संकलन, कॉलबैक, एक के बाद की तैयारी उपयोग Model.fit कॉल।

कस्टम प्रशिक्षण लूप के साथ पैरामीटर सर्वर प्रशिक्षण

कस्टम प्रशिक्षण छोरों के साथ, tf.distribute.experimental.coordinator.ClusterCoordinator वर्ग समन्वयक के लिए इस्तेमाल महत्वपूर्ण घटक है।

  • ClusterCoordinator वर्ग एक के साथ संयोजन के रूप में काम करने की जरूरत है tf.distribute.Strategy वस्तु।
  • यह tf.distribute.Strategy वस्तु समूह के बारे में जानकारी प्रदान करने की जरूरत है और के रूप में में प्रदर्शन, एक प्रशिक्षण कदम को परिभाषित करने के लिए किया जाता है tf.distribute.Strategy के साथ कस्टम प्रशिक्षण
  • ClusterCoordinator वस्तु फिर दूरदराज के श्रमिकों के लिए इन प्रशिक्षण चरणों के निष्पादन डिस्पैचिज।
  • पैरामीटर सर्वर प्रशिक्षण के लिए, ClusterCoordinator एक साथ काम करने की जरूरत है tf.distribute.experimental.ParameterServerStrategy

सबसे महत्वपूर्ण द्वारा प्रदान की एपीआई ClusterCoordinator वस्तु है schedule :

  • schedule एपीआई एक enqueues tf.function और एक भविष्य की तरह रिटर्न RemoteValue तुरंत।
  • पंक्तिबद्ध कार्यों पृष्ठभूमि धागे में दूरदराज के श्रमिकों के लिए भेजा जाएगा और उनके RemoteValue रों एसिंक्रोनस रूप से भरे जाएंगे।
  • चूंकि schedule कार्यकर्ता काम की आवश्यकता नहीं है, tf.function में पारित किसी भी उपलब्ध कार्यकर्ता पर क्रियान्वित किया जा सकता।
  • यदि जिस कार्यकर्ता पर इसे निष्पादित किया गया है, वह पूरा होने से पहले अनुपलब्ध हो जाता है, तो फ़ंक्शन को किसी अन्य उपलब्ध कार्यकर्ता पर पुनः प्रयास किया जाएगा।
  • इस तथ्य और इस तथ्य के कारण कि फ़ंक्शन निष्पादन परमाणु नहीं है, एक फ़ंक्शन को एक से अधिक बार निष्पादित किया जा सकता है।

दूरदराज के कार्यों भेजने के अलावा, ClusterCoordinator भी सभी कार्यकर्ताओं पर डेटासेट बना सकते हैं और इन डेटासेट के पुनर्निर्माण के लिए मदद करता है जब असफलता से एक कार्यकर्ता ठीक हो जाए।

ट्यूटोरियल सेटअप

ट्यूटोरियल में शाखा जाएगा Model.fit और कस्टम प्रशिक्षण पाश पथ, और आप एक है कि अपनी आवश्यकताओं फिट बैठता है चुन सकते हैं। "एक्स के साथ प्रशिक्षण" के अलावा अन्य अनुभाग दोनों पथों पर लागू होते हैं।

pip install portpicker
pip uninstall tensorflow keras -y
pip install tf-nightly
import multiprocessing
import os
import random
import portpicker
import tensorflow as tf
from tensorflow.keras.layers.experimental import preprocessing

क्लस्टर सेटअप

जैसा कि ऊपर उल्लेख, एक पैरामीटर सर्वर प्रशिक्षण क्लस्टर एक समन्वयक कार्य है कि अपने प्रशिक्षण कार्यक्रम, एक या कई कार्यकर्ताओं और पैरामीटर सर्वर कार्य चलाने TensorFlow servers- चलाता है की आवश्यकता है tf.distribute.Server -और संभवतः एक अतिरिक्त मूल्यांकन कार्य है कि रन पक्ष कार मूल्यांकन (नीचे साइड-कार मूल्यांकन अनुभाग देखें)। उन्हें स्थापित करने की आवश्यकताएं हैं:

  • समन्वयक कार्य को मूल्यांकनकर्ता को छोड़कर अन्य सभी TensorFlow सर्वरों के पते और बंदरगाहों को जानना होगा।
  • श्रमिकों और पैरामीटर सर्वरों को यह जानने की जरूरत है कि उन्हें किस पोर्ट को सुनना है। सादगी के लिए, आप आमतौर पर इन कार्यों पर TensorFlow सर्वर बनाते समय पूरी क्लस्टर जानकारी पास कर सकते हैं।
  • मूल्यांकनकर्ता कार्य को प्रशिक्षण क्लस्टर के सेटअप को जानने की आवश्यकता नहीं है। यदि ऐसा होता है, तो उसे प्रशिक्षण क्लस्टर से जुड़ने का प्रयास नहीं करना चाहिए।
  • श्रमिक और पैरामीटर सर्वर के रूप में कार्य प्रकार होना चाहिए "worker" और "ps" , क्रमशः। समन्वयक का उपयोग करना चाहिए "chief" विरासत कारणों के लिए कार्य प्रकार के रूप में।

इस ट्यूटोरियल में, आप एक इन-प्रोसेस क्लस्टर बनाएंगे ताकि संपूर्ण पैरामीटर सर्वर प्रशिक्षण कोलाब में चलाया जा सके। आप कैसे स्थापित करने के लिए सीखना होगा असली समूहों के लिए एक बाद खंड में।

इन-प्रोसेस क्लस्टर

आप पहले से कई TensorFlow सर्वर बनाकर शुरू करेंगे और बाद में उनसे जुड़ेंगे। नोट यह केवल इस ट्यूटोरियल के प्रदर्शन के प्रयोजन के लिए है कि, और वास्तविक प्रशिक्षण में सर्वर पर शुरू कर दिया हो जाएगा "worker" और "ps" मशीनों।

def create_in_process_cluster(num_workers, num_ps):
  """Creates and starts local servers and returns the cluster_resolver."""
  worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
  ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]

  cluster_dict = {}
  cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
  if num_ps > 0:
    cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]

  cluster_spec = tf.train.ClusterSpec(cluster_dict)

  # Workers need some inter_ops threads to work properly.
  worker_config = tf.compat.v1.ConfigProto()
  if multiprocessing.cpu_count() < num_workers + 1:
    worker_config.inter_op_parallelism_threads = num_workers + 1

  for i in range(num_workers):
    tf.distribute.Server(
        cluster_spec,
        job_name="worker",
        task_index=i,
        config=worker_config,
        protocol="grpc")

  for i in range(num_ps):
    tf.distribute.Server(
        cluster_spec,
        job_name="ps",
        task_index=i,
        protocol="grpc")

  cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
      cluster_spec, rpc_layer="grpc")
  return cluster_resolver

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)
2021-07-22 01:22:29.962567: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-07-22 01:22:29.967320: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_SYSTEM_DRIVER_MISMATCH: system has unsupported display driver / cuda driver combination
2021-07-22 01:22:29.967351: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-1124739887
2021-07-22 01:22:29.967359: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-1124739887
2021-07-22 01:22:29.967434: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 470.57.2
2021-07-22 01:22:29.967458: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 465.27.0
2021-07-22 01:22:29.967464: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 465.27.0 does not match DSO version 470.57.2 -- cannot find working devices in this configuration
2021-07-22 01:22:29.971985: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:29.972012: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:29.972974: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:17310
2021-07-22 01:22:29.985134: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:29.985164: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:29.985628: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:22663
2021-07-22 01:22:30.034392: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.034437: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.035565: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:17641
2021-07-22 01:22:30.044623: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.044656: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.045149: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:19682
2021-07-22 01:22:30.090235: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.090288: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.090650: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:18874

प्रक्रिया में क्लस्टर सेटअप अक्सर इस तरह के रूप में, इकाई परीक्षण में प्रयोग किया जाता है यहाँ

स्थानीय परीक्षण के लिए एक और विकल्प बाहर स्थानीय मशीन की जांच पर प्रक्रियाओं शुरू करने के लिए है Keras साथ मल्टी कार्यकर्ता प्रशिक्षण इस दृष्टिकोण का एक उदाहरण के लिए।

एक ParameterServerStrategy को तुरंत चालू करें

इससे पहले कि आप प्रशिक्षण कोड में गोता लगाने, के एक दृष्टांत जाने ParameterServerStrategy वस्तु। ध्यान दें कि यह आप के साथ कार्यवाही कर रहे हैं, चाहे जरूरत है Model.fit या एक कस्टम प्रशिक्षण पाश। variable_partitioner तर्क में समझाया जाएगा चर sharding अनुभाग

variable_partitioner = (
    tf.distribute.experimental.partitioners.MinSizePartitioner(
        min_shard_bytes=(256 << 10),
        max_shards=NUM_PS))

strategy = tf.distribute.experimental.ParameterServerStrategy(
    cluster_resolver,
    variable_partitioner=variable_partitioner)
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'ps': ['localhost:19682', 'localhost:18874'], 'worker': ['localhost:17310', 'localhost:22663', 'localhost:17641']})
INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'ps': ['localhost:19682', 'localhost:18874'], 'worker': ['localhost:17310', 'localhost:22663', 'localhost:17641']})
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:chief/replica:0/task:0/device:CPU:0'], variable_device = '/job:chief/replica:0/task:0/device:CPU:0'
INFO:tensorflow:Number of GPUs on workers: 0
2021-07-22 01:22:30.112542: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.112587: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.112599: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.136652: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.136690: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.136703: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.136754: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.136781: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.136789: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.136876: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.136917: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.136931: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.136937: I tensorflow/core/distributed_runtime/eager/eager_service_impl.cc:270] Creating sync eager service context with rendezvous_id on host kokoro-gcp-ubuntu-prod-1124739887 /job:ps/replica:0/task:1
2021-07-22 01:22:30.136965: I tensorflow/core/distributed_runtime/eager/eager_service_impl.cc:270] Creating sync eager service context with rendezvous_id on host kokoro-gcp-ubuntu-prod-1124739887 /job:ps/replica:0/task:0
2021-07-22 01:22:30.137027: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.137060: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.137071: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.137088: I tensorflow/core/distributed_runtime/eager/eager_service_impl.cc:270] Creating sync eager service context with rendezvous_id on host kokoro-gcp-ubuntu-prod-1124739887 /job:worker/replica:0/task:0
2021-07-22 01:22:30.137149: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.137185: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.137196: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.137204: I tensorflow/core/distributed_runtime/eager/eager_service_impl.cc:270] Creating sync eager service context with rendezvous_id on host kokoro-gcp-ubuntu-prod-1124739887 /job:worker/replica:0/task:1
2021-07-22 01:22:30.138485: I tensorflow/core/distributed_runtime/eager/eager_service_impl.cc:270] Creating sync eager service context with rendezvous_id on host kokoro-gcp-ubuntu-prod-1124739887 /job:worker/replica:0/task:2
2021-07-22 01:22:30.139971: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.139993: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.140000: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.140286: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:34915

प्रशिक्षण के लिए GPU का उपयोग करने के लिए, प्रत्येक कार्यकर्ता को दृश्यमान GPU आवंटित करें। ParameterServerStrategy प्रतिबंध के साथ, प्रत्येक कार्यकर्ता पर सभी उपलब्ध GPUs का उपयोग करेगा कि सभी कार्यकर्ताओं उपलब्ध GPUs की एक ही नंबर होना चाहिए।

वेरिएबल शार्डिंग

चर sharding बंटवारे के लिए कई छोटे चर, जो टुकड़े कहा जाता है में एक चर संदर्भित करता है। इन शार्क तक पहुँचने पर नेटवर्क लोड को वितरित करने के लिए वेरिएबल शार्डिंग उपयोगी हो सकती है। यह कई पैरामीटर सर्वरों में एक सामान्य चर की गणना और भंडारण को वितरित करने के लिए भी उपयोगी है।

चर sharding सक्षम करने के लिए, आप एक में पारित कर सकते हैं variable_partitioner जब एक निर्माण ParameterServerStrategy वस्तु। variable_partitioner हर बार सक्रिय किया जाएगा जब एक चर बनाई गई है और यह चर के प्रत्येक आयाम के साथ टुकड़े की संख्या लौटने की उम्मीद है। कुछ बाहर के बॉक्स variable_partitioner रों रूप में इस तरह प्रदान की जाती हैं tf.distribute.experimental.partitioners.MinSizePartitioner । ऐसा लगता है कि आकार के आधार पर partitioners उपयोग करने के लिए सिफारिश की है tf.distribute.experimental.partitioners.MinSizePartitioner छोटे चर, जो मॉडल प्रशिक्षण गति पर नकारात्मक प्रभाव पड़ सकता विभाजन से बचने के लिए।

एक जब variable_partitioner में पारित हो जाता है और यदि आप के तहत सीधे एक चर बनाने strategy.scope() , यह एक के साथ एक कंटेनर प्रकार हो जाएगा variables संपत्ति जो टुकड़े की सूची तक पहुँच प्रदान करता। ज्यादातर मामलों में, यह कंटेनर सभी शार्क को जोड़कर स्वचालित रूप से एक टेंसर में परिवर्तित हो जाएगा। नतीजतन, इसे एक सामान्य चर के रूप में इस्तेमाल किया जा सकता है। दूसरी ओर, इस तरह के रूप में कुछ TensorFlow तरीकों tf.nn.embedding_lookup इस कंटेनर प्रकार के लिए और इन तरीकों स्वचालित संयोजन से बचा जाना होगा में कुशल कार्यान्वयन प्रदान करते हैं।

कृपया के एपीआई दस्तावेज़ देख सकेंगे tf.distribute.experimental.ParameterServerStrategy अधिक जानकारी के लिए।

साथ प्रशिक्षण Model.fit

Keras के माध्यम से एक आसान से उपयोग प्रशिक्षण एपीआई प्रदान करता है Model.fit कि हैंडल हुड के नीचे प्रशिक्षण पाश, overridable के लचीलेपन के साथ train_step , और कॉलबैक, जो इस प्रकार चौकी बचत या सारांश TensorBoard के लिए बचत के रूप में कार्य प्रदान करते हैं। साथ Model.fit , समान अभ्यास कोड रणनीति वस्तु का एक सरल स्वैप के साथ अन्य रणनीतियों के लिए इस्तेमाल किया जा सकता है।

इनपुट डेटा

Model.fit पैरामीटर सर्वर प्रशिक्षण के साथ की आवश्यकता है कि इनपुट डेटा एक प्रतिदेय उस प्रकार के एक भी तर्क लेता में प्रदान किया tf.distribute.InputContext , और एक रिटर्न tf.data.Dataset । फिर, एक बनाने tf.keras.utils.experimental.DatasetCreator उद्देश्य यह है कि इस तरह के लेता callable , और एक वैकल्पिक tf.distribute.InputOptions के माध्यम से आपत्ति input_options तर्क।

ध्यान दें कि यह शफ़ल और पैरामीटर सर्वर प्रशिक्षण के साथ डेटा को दोहराने, और निर्दिष्ट करने के लिए सिफारिश की है steps_per_epoch में fit तो पुस्तकालय युग सीमाओं को जानता है कॉल।

कृपया देखें वितरित इनपुट बारे में अधिक जानकारी के लिए ट्यूटोरियल InputContext तर्क।

def dataset_fn(input_context):
  global_batch_size = 64
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)

  x = tf.random.uniform((10, 10))
  y = tf.random.uniform((10,))

  dataset = tf.data.Dataset.from_tensor_slices((x, y)).shuffle(10).repeat()
  dataset = dataset.shard(
      input_context.num_input_pipelines,
      input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2)

  return dataset

dc = tf.keras.utils.experimental.DatasetCreator(dataset_fn)

में कोड dataset_fn इनपुट डिवाइस है, जो आमतौर सीपीयू है, कार्यकर्ता मशीनों में से प्रत्येक पर पर सक्रिय किया जाएगा।

मॉडल निर्माण और संकलन

अब, आप एक पैदा करेगा tf.keras.Model -एक तुच्छ tf.keras.models.Sequential प्रदर्शन के उद्देश्य से-पीछा एक से के लिए मॉडल Model.compile कॉल इस तरह के एक अनुकूलक, मीट्रिक या इस तरह के रूप पैरामीटर के रूप में घटकों, शामिल करने के लिए steps_per_execution :

with strategy.scope():
  model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)])

model.compile(tf.keras.optimizers.SGD(), loss='mse', steps_per_execution=10)

कॉलबैक और प्रशिक्षण

इससे पहले कि आप फोन model.fit वास्तविक प्रशिक्षण के लिए, इस तरह जैसे सामान्य कार्य के लिए आवश्यक कॉलबैक तैयार करते हैं:

  • ModelCheckpoint : मॉडल वेट को बचाने के लिए।
  • BackupAndRestore : सुनिश्चित करें कि प्रशिक्षण प्रगति स्वचालित रूप से समर्थित है, और अगर बरामद बनाने के लिए क्लस्टर अनुभवों अनुपलब्धता (जैसे बीच में बंद करें या पूर्वक्रय के रूप में); या
  • TensorBoard : सारांश फ़ाइलें, जो TensorBoard उपकरण में कल्पना करने के में प्रगति रिपोर्ट को बचाने के लिए।
working_dir = '/tmp/my_working_dir'
log_dir = os.path.join(working_dir, 'log')
ckpt_filepath = os.path.join(working_dir, 'ckpt')
backup_dir = os.path.join(working_dir, 'backup')

callbacks = [
    tf.keras.callbacks.TensorBoard(log_dir=log_dir),
    tf.keras.callbacks.ModelCheckpoint(filepath=ckpt_filepath),
    tf.keras.callbacks.experimental.BackupAndRestore(backup_dir=backup_dir),
]

model.fit(dc, epochs=5, steps_per_epoch=20, callbacks=callbacks)
2021-07-22 01:22:30.205180: I tensorflow/core/profiler/lib/profiler_session.cc:131] Profiler session initializing.
2021-07-22 01:22:30.205213: I tensorflow/core/profiler/lib/profiler_session.cc:146] Profiler session started.
2021-07-22 01:22:30.207087: I tensorflow/core/profiler/lib/profiler_session.cc:164] Profiler session tear down.
Epoch 1/5
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
2021-07-22 01:22:34.281880: I tensorflow/core/profiler/lib/profiler_session.cc:131] Profiler session initializing.
2021-07-22 01:22:34.281923: I tensorflow/core/profiler/lib/profiler_session.cc:146] Profiler session started.
2021-07-22 01:22:34.290681: I tensorflow/core/profiler/lib/profiler_session.cc:66] Profiler session collecting data.
2021-07-22 01:22:34.291221: I tensorflow/core/profiler/lib/profiler_session.cc:164] Profiler session tear down.
2021-07-22 01:22:34.292249: I tensorflow/core/profiler/rpc/client/save_profile.cc:136] Creating directory: /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34

2021-07-22 01:22:34.292801: I tensorflow/core/profiler/rpc/client/save_profile.cc:142] Dumped gzipped tool data for trace.json.gz to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.trace.json.gz
2021-07-22 01:22:34.294605: I tensorflow/core/profiler/rpc/client/save_profile.cc:136] Creating directory: /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34

2021-07-22 01:22:34.294780: I tensorflow/core/profiler/rpc/client/save_profile.cc:142] Dumped gzipped tool data for memory_profile.json.gz to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.memory_profile.json.gz
2021-07-22 01:22:34.294930: I tensorflow/core/profiler/rpc/client/capture_profile.cc:251] Creating directory: /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34
Dumped tool data for xplane.pb to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.xplane.pb
Dumped tool data for overview_page.pb to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.overview_page.pb
Dumped tool data for input_pipeline.pb to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.input_pipeline.pb
Dumped tool data for tensorflow_stats.pb to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.tensorflow_stats.pb
Dumped tool data for kernel_stats.pb to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.kernel_stats.pb

2021-07-22 01:22:34.380988: W tensorflow/core/framework/dataset.cc:679] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
20/20 - 4s - loss: 0.2856 - 4s/epoch - 201ms/step
2021-07-22 01:22:34.737150: W tensorflow/python/util/util.cc:348] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
2021-07-22 01:22:34.993072: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
2021-07-22 01:22:35.067372: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
Epoch 2/5
20/20 - 0s - loss: 0.3160 - 187ms/epoch - 9ms/step
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
Epoch 3/5
20/20 - 0s - loss: 0.2000 - 32ms/epoch - 2ms/step
2021-07-22 01:22:35.567146: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
2021-07-22 01:22:35.639496: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
WARNING:tensorflow:5 out of the last 5 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f6ce1aeb200> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
WARNING:tensorflow:6 out of the last 6 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f6cfc1e5560> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
Epoch 4/5
20/20 - 0s - loss: 0.2395 - 32ms/epoch - 2ms/step
2021-07-22 01:22:35.986756: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
2021-07-22 01:22:36.059412: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
Epoch 5/5
20/20 - 0s - loss: 0.1527 - 32ms/epoch - 2ms/step
2021-07-22 01:22:36.403661: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
2021-07-22 01:22:36.475197: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
2021-07-22 01:22:36.818981: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
2021-07-22 01:22:36.891188: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
<keras.callbacks.History at 0x7f6e7801fc50>

साथ सीधा उपयोग ClusterCoordinator (वैकल्पिक)

यहां तक कि अगर आप चुनते Model.fit प्रशिक्षण पथ, आप वैकल्पिक एक दृष्टांत कर सकते हैं tf.distribute.experimental.coordinator.ClusterCoordinator वस्तु अन्य कार्यों आप कार्यकर्ताओं पर निष्पादित किया जाना चाहते हैं शेड्यूल करने के लिए। देखें एक कस्टम प्रशिक्षण पाश के साथ प्रशिक्षण अधिक जानकारी और उदाहरण के लिए खंड।

एक कस्टम प्रशिक्षण लूप के साथ प्रशिक्षण

के साथ कस्टम प्रशिक्षण छोरों का उपयोग tf.distribute.Strategy महान लचीलापन प्रशिक्षण छोरों को परिभाषित करने देता है। साथ ParameterServerStrategy ऊपर परिभाषित (के रूप में strategy ), आप एक का उपयोग करेगा tf.distribute.experimental.coordinator.ClusterCoordinator दूरदराज के कार्यकर्ताओं को प्रशिक्षण चरणों के निष्पादन प्रेषण करने के लिए।

उसके बाद, आप, एक मॉडल का निर्माण करेगा, एक डाटासेट और एक कदम समारोह को परिभाषित के रूप में आप के साथ अन्य प्रशिक्षण पाश में किया है tf.distribute.Strategy रों। आप में और अधिक जानकारी प्राप्त कर सकते tf.distribute.Strategy के साथ कस्टम प्रशिक्षण ट्यूटोरियल।

कुशल डाटासेट प्रीफेचिंग सुनिश्चित करने के लिए उपयोग करें डाटासेट निर्माण एपीआई में उल्लेख वितरित की सिफारिश की दूरदराज के श्रमिकों के लिए डिस्पैच प्रशिक्षण चरणों अनुभाग देखें। इसके अलावा, कॉल करने के लिए सुनिश्चित करें कि Strategy.run अंदर worker_fn श्रमिकों के लिए आवंटित GPUs का पूरा लाभ लेने के लिए। शेष चरण GPU के साथ या उसके बिना प्रशिक्षण के लिए समान हैं।

आइए इन घटकों को निम्नलिखित चरणों में बनाएं:

डेटा सेट करें

सबसे पहले, एक समारोह है कि एक डाटासेट कि द्वारा कार्यान्वित तर्क preprocessing भी शामिल है बनाता है लिखने Keras preprocessing परतों

आप बाहर इन परतों का निर्माण करेगा dataset_fn अंदर लेकिन लागू परिवर्तन dataset_fn , जब से तुम लपेटो जाएगा dataset_fn एक में tf.function , जो चर इसके अंदर बनाने की अनुमति नहीं देता है।

feature_vocab = [
    "avenger", "ironman", "batman", "hulk", "spiderman", "kingkong", "wonder_woman"
]
label_vocab = ["yes", "no"]

with strategy.scope():
  feature_lookup_layer = preprocessing.StringLookup(
      vocabulary=feature_vocab,
      mask_token=None)
  label_lookup_layer = preprocessing.StringLookup(
      vocabulary=label_vocab,
      num_oov_indices=0,
      mask_token=None)

  raw_feature_input = tf.keras.layers.Input(
      shape=(3,),
      dtype=tf.string,
      name="feature")
  feature_id_input = feature_lookup_layer(raw_feature_input)
  feature_preprocess_stage = tf.keras.Model(
      {"features": raw_feature_input},
      feature_id_input)

  raw_label_input = tf.keras.layers.Input(
      shape=(1,),
      dtype=tf.string,
      name="label")
  label_id_input = label_lookup_layer(raw_label_input)

  label_preprocess_stage = tf.keras.Model(
      {"label": raw_label_input},
      label_id_input)

डेटासेट में खिलौना उदाहरण जेनरेट करें:

def feature_and_label_gen(num_examples=200):
  examples = {"features": [], "label": []}
  for _ in range(num_examples):
    features = random.sample(feature_vocab, 3)
    label = ["yes"] if "avenger" in features else ["no"]
    examples["features"].append(features)
    examples["label"].append(label)
  return examples

examples = feature_and_label_gen()

फिर, एक में लिपटे प्रशिक्षण डाटासेट बनाने dataset_fn :

def dataset_fn(_):
  raw_dataset = tf.data.Dataset.from_tensor_slices(examples)

  train_dataset = raw_dataset.map(
      lambda x: (
          {"features": feature_preprocess_stage(x["features"])},
          label_preprocess_stage(x["label"])
      )).shuffle(200).batch(32).repeat()
  return train_dataset

मॉडल बनाएं

इसके बाद, मॉडल और अन्य ऑब्जेक्ट बनाएं। के तहत सभी चर बनाने के लिए सुनिश्चित करें कि strategy.scope

# These variables created under the `strategy.scope` will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
  # Create the model. The input needs to be compatible with Keras processing layers.
  model_input = tf.keras.layers.Input(
      shape=(3,), dtype=tf.int64, name="model_input")

  emb_layer = tf.keras.layers.Embedding(
      input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=16384)
  emb_output = tf.reduce_mean(emb_layer(model_input), axis=1)
  dense_output = tf.keras.layers.Dense(units=1, activation="sigmoid")(emb_output)
  model = tf.keras.Model({"features": model_input}, dense_output)

  optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.1)
  accuracy = tf.keras.metrics.Accuracy()

आइए पुष्टि करें कि के उपयोग FixedShardsPartitioner दो टुकड़े में सभी चर विभाजित है और प्रत्येक ठीकरा अलग पैरामीटर सर्वर को सौंपा गया था:

assert len(emb_layer.weights) == 2
assert emb_layer.weights[0].shape == (4, 16384)
assert emb_layer.weights[1].shape == (4, 16384)
assert emb_layer.weights[0].device == "/job:ps/replica:0/task:0/device:CPU:0"
assert emb_layer.weights[1].device == "/job:ps/replica:0/task:1/device:CPU:0"

प्रशिक्षण चरण को परिभाषित करें

तीसरा, एक में लपेटा प्रशिक्षण चरण बनाना tf.function :

@tf.function
def step_fn(iterator):

  def replica_fn(batch_data, labels):
    with tf.GradientTape() as tape:
      pred = model(batch_data, training=True)
      per_example_loss = tf.keras.losses.BinaryCrossentropy(
              reduction=tf.keras.losses.Reduction.NONE)(labels, pred)
      loss = tf.nn.compute_average_loss(per_example_loss)
      gradients = tape.gradient(loss, model.trainable_variables)

    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    accuracy.update_state(labels, actual_pred)
    return loss

  batch_data, labels = next(iterator)
  losses = strategy.run(replica_fn, args=(batch_data, labels))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)

ऊपर प्रशिक्षण कदम समारोह में, बुला Strategy.run और Strategy.reduce में step_fn कार्यकर्ता प्रति कई GPUs समर्थन कर सकते हैं। श्रमिकों GPUs आवंटित किया है, Strategy.run कई प्रतिकृतियां पर डेटासेट वितरित करेंगे।

दूरस्थ श्रमिकों को प्रशिक्षण कदम भेजें

आखिर संगणना द्वारा परिभाषित कर रहे ParameterServerStrategy , आप का उपयोग करेगा tf.distribute.experimental.coordinator.ClusterCoordinator संसाधनों बनाने के लिए वर्ग और दूरदराज के कार्यकर्ताओं को प्रशिक्षण चरणों वितरित करते हैं।

की पहली एक बनाएँ ClusterCoordinator वस्तु और रणनीति वस्तु में पारित:

coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)

फिर, प्रति-कार्यकर्ता डेटासेट और एक पुनरावर्तक बनाएं। में per_worker_dataset_fn नीचे, लपेटकर dataset_fn में strategy.distribute_datasets_from_function GPUs के लिए कुशल प्रीफेचिंग मूल अनुमति देने के लिए सिफारिश की है।

@tf.function
def per_worker_dataset_fn():
  return strategy.distribute_datasets_from_function(dataset_fn)

per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).

अंतिम चरण का उपयोग कर दूरस्थ श्रमिकों के लिए गणना वितरित करने के लिए है ClusterCoordinator.schedule :

  • schedule विधि एक enqueues tf.function और एक भविष्य की तरह रिटर्न RemoteValue तुरंत। पंक्तिबद्ध कार्यों पृष्ठभूमि धागे में दूरदराज के श्रमिकों के लिए भेजा जाएगा और RemoteValue एसिंक्रोनस रूप से भरे जाएंगे।
  • join विधि ( ClusterCoordinator.join ) जब तक सभी अनुसूचित कार्यों क्रियान्वित कर रहे हैं प्रतीक्षा करने के लिए इस्तेमाल किया जा सकता।
num_epoches = 4
steps_per_epoch = 5
for i in range(num_epoches):
  accuracy.reset_states()
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  # Wait at epoch boundaries.
  coordinator.join()
  print ("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
Finished epoch 0, accuracy is 0.668750.
Finished epoch 1, accuracy is 0.450000.
Finished epoch 2, accuracy is 1.000000.
Finished epoch 3, accuracy is 1.000000.

यहाँ कैसे आप एक का परिणाम प्राप्त कर सके है RemoteValue :

loss = coordinator.schedule(step_fn, args=(per_worker_iterator,))
print ("Final loss is %f" % loss.fetch())
Final loss is 0.000000

वैकल्पिक रूप से, आप सभी चरणों को लॉन्च कर सकते हैं और पूरा होने की प्रतीक्षा करते हुए कुछ कर सकते हैं:

for _ in range(total_steps):
  coordinator.schedule(step_fn, args=(per_worker_iterator,))
while not coordinator.done():
  time.sleep(10)
  # Do something like logging metrics or writing checkpoints.

पूरा प्रशिक्षण और इस विशिष्ट उदाहरण के लिए की सेवा कार्यप्रवाह के लिए, यह बाहर की जाँच कृपया परीक्षण

डेटासेट निर्माण के बारे में अधिक जानकारी

उपरोक्त कोड में डेटासेट का उपयोग करके बनाई गई है ClusterCoordinator.create_per_worker_dataset एपीआई)। यह प्रति कार्यकर्ता एक डेटासेट बनाता है और एक कंटेनर ऑब्जेक्ट देता है। आप कॉल कर सकते हैं iter एक प्रति कार्यकर्ता इटरेटर बनाने के लिए उस पर विधि। प्रति कार्यकर्ता इटरेटर कार्यकर्ता प्रति एक इटरेटर होता है और एक कार्यकर्ता की इसी टुकड़ा समारोह के लिए पारित के इनपुट तर्क में प्रतिस्थापित किया जाएगा ClusterCoordinator.schedule विधि समारोह एक विशेष कार्यकर्ता पर निष्पादित किया जाता है से पहले।

वर्तमान में, ClusterCoordinator.schedule विधि मान लिया गया श्रमिकों के बराबर हैं और इस तरह मानता विभिन्न कार्यकर्ताओं पर डेटासेट ही हैं, केवल वे शफ़ल किया जा सकता अलग तरह से अगर वे एक को शामिल Dataset.shuffle आपरेशन। इस वजह से, यह भी सिफारिश की है कि डेटासेट अनिश्चित काल के दोहराया जाना और आप के बजाय पर निर्भर के कदम की एक सीमित संख्या अनुसूची OutOfRangeError एक डाटासेट से।

एक अन्य महत्वपूर्ण ध्यान दें कि है tf.data डेटासेट कार्य सीमाओं के पार निहित क्रमबद्धता और अक्रमांकन समर्थन नहीं करते। तो यह कार्य करने के लिए पारित कर दिया अंदर पूरे डाटासेट बनाने के लिए महत्वपूर्ण है ClusterCoordinator.create_per_worker_dataset

मूल्यांकन

वितरित प्रशिक्षण में मूल्यांकन लूप को परिभाषित करने और चलाने के एक से अधिक तरीके हैं। नीचे वर्णित अनुसार प्रत्येक के अपने फायदे और नुकसान हैं। यदि आपकी कोई प्राथमिकता नहीं है, तो इनलाइन मूल्यांकन पद्धति की अनुशंसा की जाती है।

इनलाइन मूल्यांकन

इस विधि में, प्रशिक्षण और मूल्यांकन और इस प्रकार यह यह कहा जाता है इनलाइन मूल्यांकन के बीच समन्वयक विकल्पों।

इनलाइन मूल्यांकन के कई लाभ हैं। उदाहरण के लिए:

  • यह बड़े मूल्यांकन मॉडल और मूल्यांकन डेटासेट का समर्थन कर सकता है जो एक एकल कार्य नहीं कर सकता है।
  • अगले युग के प्रशिक्षण के लिए निर्णय लेने के लिए मूल्यांकन परिणामों का उपयोग किया जा सकता है।

इनलाइन मूल्यांकन को लागू करने के दो तरीके हैं: प्रत्यक्ष मूल्यांकन और वितरित मूल्यांकन।

  • प्रत्यक्ष मूल्यांकन: छोटे मॉडल और मूल्यांकन डेटासेट के लिए, समन्वयक मूल्यांकन सीधे समन्वयक पर मूल्यांकन डाटासेट के साथ वितरित मॉडल पर चला सकते हैं:
eval_dataset = tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).batch(8)

eval_accuracy = tf.keras.metrics.Accuracy()

for batch_data, labels in eval_dataset:
  pred = model(batch_data, training=False)
  actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
  eval_accuracy.update_state(labels, actual_pred)

print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000
  • वितरित मूल्यांकन: बड़े मॉडल या डेटासेट कि समन्वयक पर सीधे चलाने के लिए अव्यवहार्य हैं, समन्वयक कार्य के माध्यम से श्रमिकों के लिए मूल्यांकन कार्य वितरित कर सकते हैं ClusterCoordinator.schedule / ClusterCoordinator.join तरीके:
with strategy.scope():
  # Define the eval metric on parameter servers.
  eval_accuracy = tf.keras.metrics.Accuracy()

@tf.function
def eval_step(iterator):
  def replica_fn(batch_data, labels):
    pred = model(batch_data, training=False)
    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    eval_accuracy.update_state(labels, actual_pred)
  batch_data, labels = next(iterator)
  strategy.run(replica_fn, args=(batch_data, labels))

def eval_dataset_fn():
  return tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).shuffle(16).repeat().batch(8)

per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn)
per_worker_eval_iterator = iter(per_worker_eval_dataset)

eval_steps_per_epoch = 2
for _ in range(eval_steps_per_epoch):
  coordinator.schedule(eval_step, args=(per_worker_eval_iterator,))
coordinator.join()
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000

साइड-कार मूल्यांकन

एक अन्य विधि पक्ष कार मूल्यांकन जहां एक समर्पित मूल्यांकनकर्ता कार्य बनाएँ कि बार-बार चौकियों पढ़ता है और एक नवीनतम चौकी पर मूल्यांकन चलाता है कहा जाता है। यह आपके प्रशिक्षण कार्यक्रम को जल्दी समाप्त करने की अनुमति देता है यदि आपको मूल्यांकन परिणामों के आधार पर अपने प्रशिक्षण लूप को बदलने की आवश्यकता नहीं है। हालांकि, मूल्यांकन को ट्रिगर करने के लिए अतिरिक्त मूल्यांकनकर्ता कार्य और आवधिक जांच की आवश्यकता होती है। एक संभावित साइड-कार मूल्यांकन लूप निम्नलिखित है:

checkpoint_dir = ...
eval_model = ...
eval_data = ...
checkpoint = tf.train.Checkpoint(model=eval_model)

for latest_checkpoint in tf.train.checkpoints_iterator(
    checkpoint_dir):
  try:
    checkpoint.restore(latest_checkpoint).expect_partial()
  except (tf.errors.OpError,) as e:
    # checkpoint may be deleted by training when it is about to read it.
    continue

  # Optionally add callbacks to write summaries.
  eval_model.evaluate(eval_data)

  # Evaluation finishes when it has evaluated the last epoch.
  if latest_checkpoint.endswith('-{}'.format(train_epoches)):
    break

वास्तविक दुनिया में क्लस्टर

एक वास्तविक उत्पादन वातावरण में, आप विभिन्न मशीनों पर विभिन्न प्रक्रियाओं में सभी कार्यों को चलाएंगे। प्रत्येक कार्य पर कॉन्फ़िगर क्लस्टर जानकारी के लिए सबसे आसान तरीका है स्थापित करने के लिए है "TF_CONFIG" वातावरण चर और एक का उपयोग tf.distribute.cluster_resolver.TFConfigClusterResolver पार्स करने के लिए "TF_CONFIG"

के बारे में एक सामान्य वर्णन के लिए "TF_CONFIG" वातावरण चर, का उल्लेख वितरित प्रशिक्षण गाइड।

आप Kubernetes या अन्य विन्यास टेम्पलेट का उपयोग कर अपने प्रशिक्षण कार्य शुरू करते हैं, तो बहुत संभव है इन टेम्पलेट्स पहले से ही निर्धारित किया है वह यह है कि “TF_CONFIG" आप के लिए।

सेट "TF_CONFIG" वातावरण चर

मान लीजिए आप 3 कार्यकर्ताओं और 2 पैरामीटर सर्वर है, "TF_CONFIG" कार्यकर्ता 1 के हो सकते हैं:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"],
        "chief": ["host6:port"]
    },
    "task": {"type": "worker", "index": 1}
})

"TF_CONFIG" मूल्यांकनकर्ता के हो सकते हैं:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "evaluator": ["host7:port"]
    },
    "task": {"type": "evaluator", "index": 0}
})

"cluster" ऊपर में भाग "TF_CONFIG" मूल्यांकनकर्ता के लिए स्ट्रिंग वैकल्पिक है।

यदि आप सभी कार्यों के लिए एक ही बाइनरी का उपयोग करते हैं

यदि आप इन सभी कार्यों को एकल बाइनरी का उपयोग करके चलाना पसंद करते हैं, तो आपको शुरुआत में ही अपने प्रोग्राम की शाखा को विभिन्न भूमिकाओं में जाने देना होगा:

cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if cluster_resolver.task_type in ("worker", "ps"):
  # Start a TensorFlow server and wait.
elif cluster_resolver.task_type == "evaluator":
  # Run side-car evaluation
else:
  # Run the coordinator.

निम्न कोड एक TensorFlow सर्वर शुरू करता है और प्रतीक्षा करता है:

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

server = tf.distribute.Server(
    cluster_resolver.cluster_spec(),
    job_name=cluster_resolver.task_type,
    task_index=cluster_resolver.task_id,
    protocol=cluster_resolver.rpc_layer or "grpc",
    start=True)
server.join()

कार्य विफलता को संभालना

कार्यकर्ता विफलता

tf.distribute.experimental.coordinator.ClusterCoordinator या Model.fit प्रदान निर्मित कार्यकर्ता विफलता के लिए दोष सहिष्णुता। कार्यकर्ता वसूली करने पर, (या तो करने के लिए पूर्व में प्रदान की डाटासेट समारोह ClusterCoordinator.create_per_worker_dataset एक कस्टम प्रशिक्षण पाश के लिए, या tf.keras.utils.experimental.DatasetCreator के लिए Model.fit डेटासेट से बनाने पुनः) के कार्यकर्ताओं पर सक्रिय किया जाएगा।

पैरामीटर सर्वर या समन्वयक विफलता

हालांकि, जब समन्वयक एक पैरामीटर सर्वर त्रुटि देखता है, यह एक बढ़ा देंगे UnavailableError या AbortedError तुरंत। आप इस मामले में समन्वयक को पुनरारंभ कर सकते हैं। समन्वयक स्वयं भी अनुपलब्ध हो सकता है। इसलिए, प्रशिक्षण प्रगति को न खोने के लिए कुछ टूलिंग की सिफारिश की जाती है:

  • के लिए Model.fit , आप एक का उपयोग करना चाहिए BackupAndRestore कॉलबैक, जो प्रगति की बचत और बहाली स्वचालित रूप से संभालती है। देखें कॉलबैक और प्रशिक्षण एक उदाहरण के लिए ऊपर अनुभाग।

  • एक कस्टम प्रशिक्षण लूप के लिए, आपको समय-समय पर मॉडल चर की जांच करनी चाहिए और प्रशिक्षण शुरू होने से पहले एक चेकपॉइंट से मॉडल चर लोड करना चाहिए, यदि कोई हो। प्रशिक्षण प्रगति से लगभग निष्कर्ष निकाला जा सकता optimizer.iterations अगर एक अनुकूलक checkpointed जाता है:

checkpoint_manager = tf.train.CheckpointManager(
    tf.train.Checkpoint(model=model, optimizer=optimizer),
    checkpoint_dir,
    max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
  checkpoint = checkpoint_manager.checkpoint
  checkpoint.restore(
      checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()

global_steps = int(optimizer.iterations.numpy())
starting_epoch = global_steps // steps_per_epoch

for _ in range(starting_epoch, num_epoches):
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  coordinator.join()
  checkpoint_manager.save()

एक ला रहा है RemoteValue

एक ला रहा है RemoteValue अगर एक समारोह को सफलतापूर्वक क्रियान्वित किया जाता है सफल होने के लिए गारंटी है। ऐसा इसलिए है क्योंकि वर्तमान में फ़ंक्शन निष्पादित होने के बाद वापसी मूल्य तुरंत समन्वयक को कॉपी किया जाता है। यदि प्रतिलिपि के दौरान कोई कार्यकर्ता विफलता होती है, तो फ़ंक्शन को किसी अन्य उपलब्ध कर्मचारी पर पुनः प्रयास किया जाएगा। इसलिए, यदि आप प्रदर्शन के लिए अनुकूलित करना चाहते हैं, तो आप बिना रिटर्न वैल्यू के फ़ंक्शन शेड्यूल कर सकते हैं।

त्रुटि की सूचना देना

एक बार जब समन्वयक के रूप में इस तरह के एक त्रुटि देखता UnavailableError इस तरह के एक के रूप में पैरामीटर सर्वर या अन्य आवेदन त्रुटियों से InvalidArgument से tf.debugging.check_numerics , यह त्रुटि ऊपर उठाने से पहले सभी लंबित और पंक्तिबद्ध कार्यों रद्द हो जाएगा। लाई जा रही है उनकी संगत RemoteValue एक बढ़ा देंगे CancelledError

एक त्रुटि उठाए जाने के बाद, समन्वयक उसी त्रुटि या रद्द किए गए कार्यों से कोई त्रुटि नहीं उठाएगा।

प्रदर्शन में सुधार

वहाँ यदि आप प्रदर्शन के मुद्दों को देखने के लिए जब आप के साथ प्रशिक्षित कई संभावित कारण हैं ParameterServerStrategy और ClusterResolver

एक सामान्य कारण यह है कि पैरामीटर सर्वर में असंतुलित लोड होता है और कुछ भारी-भरकम पैरामीटर सर्वर क्षमता तक पहुँच जाते हैं। कई मूल कारण भी हो सकते हैं। इस समस्या को कम करने के कुछ सरल उपाय इस प्रकार हैं:

  1. एक निर्दिष्ट करने के माध्यम से अपने बड़े मॉडल चर ठीकरा variable_partitioner जब एक निर्माण ParameterServerStrategy
  2. यदि संभव हो तो एक ही चरण में सभी पैरामीटर सर्वरों के लिए आवश्यक हॉटस्पॉट वैरिएबल बनाने से बचें। उदाहरण के लिए, एक निरंतर सीखने की दर या उपवर्ग का उपयोग tf.keras.optimizers.schedules.LearningRateSchedule optimizers में के बाद से डिफ़ॉल्ट व्यवहार है कि शिक्षा दर एक चर प्रत्येक चरण में एक विशेष पैरामीटर सर्वर पर रखा जाता है और अन्य सभी पैरामीटर सर्वर से अनुरोध किया हो जाएंगे .
  3. अपनी बड़ी शब्दावली को केरस प्रीप्रोसेसिंग परतों में भेजने से पहले उन्हें फेरबदल करें।

प्रदर्शन समस्याओं का एक अन्य संभावित कारण समन्वयक है। का पहला कार्यान्वयन schedule / join अजगर आधारित है और इस तरह भूमि के ऊपर सूत्रण हो सकता है। साथ ही समन्वयक और श्रमिकों के बीच विलंबता बड़ी हो सकती है। यदि यह बात है तो,

  • के लिए Model.fit , आप सेट कर सकते हैं steps_per_execution पर प्रदान की जाती तर्क Model.compile 1 से भी बड़ा एक मूल्य के लिए।

  • एक कस्टम प्रशिक्षण पाश के लिए, आप एक एकल में कई चरणों पैक कर सकते हैं tf.function :

steps_per_invocation = 10

@tf.function
def step_fn(iterator):
  for _ in range(steps_per_invocation):
    features, labels = next(iterator)
    def replica_fn(features, labels):
      ...

    strategy.run(replica_fn, args=(features, labels))

जैसा कि पुस्तकालय को और अधिक अनुकूलित किया गया है, उम्मीद है कि अधिकांश उपयोगकर्ताओं को भविष्य में मैन्युअल रूप से चरणों को पैक करने की आवश्यकता नहीं होगी।

इसके अलावा, प्रदर्शन में सुधार के लिए एक छोटी सी तरकीब है बिना रिटर्न वैल्यू के कार्यों को शेड्यूल करना जैसा कि ऊपर दिए गए हैंडलिंग टास्क फेल्योर सेक्शन में बताया गया है।

ज्ञात सीमाएं

अधिकांश ज्ञात सीमाएँ पहले से ही उपरोक्त अनुभागों में शामिल हैं। यह खंड एक सारांश प्रदान करता है।

ParameterServerStrategy सामान्य

  • os.environment["grpc_fail_fast"]="use_caller" समन्वयक सहित हर काम पर की जरूरत है, दोष सहिष्णुता को ठीक से काम करने के लिए।
  • सिंक्रोनस पैरामीटर सर्वर प्रशिक्षण समर्थित नहीं है।
  • इष्टतम प्रदर्शन प्राप्त करने के लिए आमतौर पर एक ही फ़ंक्शन में कई चरणों को पैक करना आवश्यक होता है।
  • यह माध्यम से एक saved_model लोड करने के लिए समर्थित नहीं है tf.saved_model.load sharded चर हैं। TensorFlow सर्विंग का उपयोग करके ऐसे सहेजे गए_मॉडल को लोड करने पर ध्यान दें, काम करने की उम्मीद है।
  • यह एक चेकपॉइंट लोड करने के लिए समर्थित नहीं है जिसमें शार्प किए गए ऑप्टिमाइज़र स्लॉट वेरिएबल्स को अलग-अलग संख्या में शार्क में लोड किया जाता है।
  • यह समन्वयक कार्य को पुनरारंभ किए बिना पैरामीटर सर्वर विफलता से पुनर्प्राप्त करने के लिए समर्थित नहीं है।
  • का उपयोग tf.lookup.StaticHashTable (जो आमतौर पर कुछ लोगों द्वारा प्रयोग किया जाता है tf.keras.layers.experimental.preprocessing परतों, जैसे IntegerLookup , StringLookup , और TextVectorization पैरामीटर सर्वर प्रशिक्षण के साथ इस समय समन्वयक पर रखा संसाधनों में परिणाम)। यह आरपीसी को देखने के लिए श्रमिकों से लेकर समन्वयक तक के प्रदर्शन पर प्रभाव डालता है। यह संबोधित करने के लिए एक वर्तमान उच्च प्राथमिकता है।

Model.fit बारीकियों

  • steps_per_epoch तर्क में आवश्यक है Model.fit । आप एक ऐसे मान का चयन कर सकते हैं जो किसी युग में उपयुक्त अंतराल प्रदान करता हो।
  • ParameterServerStrategy प्रदर्शन कारणों से बैच स्तरीय कॉल है कि कस्टम कॉलबैक के लिए समर्थन नहीं है। आप उपयुक्त रूप से उठाया के साथ युग स्तर के कॉल में उन कॉल कन्वर्ट चाहिए steps_per_epoch इतना है कि वे हर कहा जाता है, steps_per_epoch चरणों की संख्या। अंतर्निहित कॉलबैक प्रभावित नहीं होते हैं: उनके बैच-स्तरीय कॉलों को निष्पादक होने के लिए संशोधित किया गया है। के लिए बैच स्तरीय कॉल सहायक ParameterServerStrategy की योजना बनाई जा रही है।
  • इसी कारण से, अन्य रणनीतियों के विपरीत, प्रगति बार और मेट्रिक्स केवल युग की सीमाओं पर लॉग किए जाते हैं।
  • run_eagerly समर्थित नहीं है।

कस्टम प्रशिक्षण लूप की बारीकियां

  • ClusterCoordinator.schedule एक डाटासेट के लिए मुलाक़ात की गारंटी देता है समर्थन नहीं करता।
  • जब ClusterCoordinator.create_per_worker_dataset प्रयोग किया जाता है, पूरे डाटासेट समारोह इसे करने के लिए पारित कर दिया अंदर बनाया जाना चाहिए।
  • tf.data.Options द्वारा बनाई गई किसी डेटासेट में नजरअंदाज कर दिया है ClusterCoordinator.create_per_worker_dataset