तिथि को रक्षित करें! Google I / O 18-20 मई को पंजीकृत करता है
इस पेज का अनुवाद Cloud Translation API से किया गया है.
Switch to English

केरस के साथ बहु-कार्यकर्ता प्रशिक्षण

TensorFlow.org पर देखें Google Colab में चलाएं GitHub पर स्रोत देखें नोटबुक डाउनलोड करें

अवलोकन

यह ट्यूटोरियल विशेष रूप से tf.distribute.MultiWorkerMirroredStrategy API, विशेष रूप से tf.distribute.MultiWorkerMirroredStrategy का उपयोग करके tf.distribute.Strategy मॉडल के साथ बहु-कार्यकर्ता वितरित प्रशिक्षण प्रदर्शित करता है। इस रणनीति की मदद से, एक केरस मॉडल जिसे एकल-कार्यकर्ता पर चलाने के लिए डिज़ाइन किया गया था, वह न्यूनतम कोड परिवर्तन के साथ कई श्रमिकों पर मूल रूप से काम कर सकता है।

TensorFlow गाइड में वितरित प्रशिक्षण वितरण रणनीतियों के अवलोकन के लिए उपलब्ध है। tf.distribute.Strategy API की गहरी समझ के इच्छुक लोगों के लिए समर्थन करता है।

सेट अप

सबसे पहले, कुछ आवश्यक आयात।

import json
import os
import sys

TensorFlow आयात करने से पहले, पर्यावरण में कुछ बदलाव करें।

सभी GPU अक्षम करें। यह श्रमिकों के कारण होने वाली त्रुटियों को रोकता है, सभी एक ही GPU का उपयोग करने की कोशिश कर रहे हैं। एक वास्तविक आवेदन के लिए प्रत्येक कार्यकर्ता एक अलग मशीन पर होगा।

os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

TF_CONFIG पर्यावरण चर को रीसेट करें, आप इसके बारे में बाद में देखेंगे।

os.environ.pop('TF_CONFIG', None)

सुनिश्चित करें कि वर्तमान निर्देशिका अजगर के मार्ग पर है। यह नोटबुक को बाद में %%writefile द्वारा लिखी गई फाइलों को आयात करने की अनुमति देता है।

if '.' not in sys.path:
  sys.path.insert(0, '.')

अब TensorFlow आयात करें।

import tensorflow as tf

डेटासेट और मॉडल परिभाषा

अगला एक साधारण मॉडल और डेटासेट सेटअप के साथ एक mnist.py फ़ाइल बनाएं। इस अजगर फ़ाइल को इस ट्यूटोरियल में कार्यकर्ता-प्रक्रियाओं द्वारा उपयोग किया जाएगा:

%%writefile mnist.py

import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the range [0, 255].
  # You need to convert them to float32 with values in the range [0, 1]
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
  return train_dataset

def build_and_compile_cnn_model():
  model = tf.keras.Sequential([
      tf.keras.Input(shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
      metrics=['accuracy'])
  return model
Writing mnist.py

छोटी संख्या में युगों के लिए मॉडल को प्रशिक्षित करने का प्रयास करें और एक कार्यकर्ता के परिणामों का निरीक्षण करके सुनिश्चित करें कि सब कुछ सही ढंग से काम करता है। जैसे-जैसे प्रशिक्षण आगे बढ़ता है, नुकसान कम होना चाहिए और सटीकता बढ़नी चाहिए।

import mnist

batch_size = 64
single_worker_dataset = mnist.mnist_dataset(batch_size)
single_worker_model = mnist.build_and_compile_cnn_model()
single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
11493376/11490434 [==============================] - 0s 0us/step
Epoch 1/3
70/70 [==============================] - 1s 13ms/step - loss: 2.2959 - accuracy: 0.0977
Epoch 2/3
70/70 [==============================] - 1s 12ms/step - loss: 2.2311 - accuracy: 0.2726
Epoch 3/3
70/70 [==============================] - 1s 13ms/step - loss: 2.1668 - accuracy: 0.4236
<tensorflow.python.keras.callbacks.History at 0x7f62c6ec0780>

बहु-कार्यकर्ता विन्यास

अब बहु-कार्यकर्ता प्रशिक्षण की दुनिया में प्रवेश करते हैं। TensorFlow में, TF_CONFIG पर्यावरण चर को कई मशीनों पर प्रशिक्षण के लिए आवश्यक है, जिनमें से प्रत्येक की संभवतः एक अलग भूमिका है। TF_CONFIG एक JSON स्ट्रिंग है जिसका उपयोग क्लस्टर के भाग के प्रत्येक कार्यकर्ता पर क्लस्टर कॉन्फ़िगरेशन को निर्दिष्ट करने के लिए किया जाता है।

यहाँ एक उदाहरण विन्यास है:

tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

यहां एक ही TF_CONFIG को JSON स्ट्रिंग के रूप में क्रमबद्ध किया गया है:

json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'

TF_CONFIG दो घटक हैं: cluster और task

  • cluster सभी श्रमिकों के लिए समान है और प्रशिक्षण क्लस्टर के बारे में जानकारी प्रदान करता है, जो विभिन्न प्रकार की नौकरियों जैसे कि worker रूप में एक तानाशाही है। मल्टी-वर्कर प्रशिक्षण में MultiWorkerMirroredStrategy साथ, आमतौर पर एक worker होता है, जो एक नियमित worker क्या करता है, इसके अलावा चेकपॉइंट को बचाने और TensorBoard के लिए सारांश फ़ाइल लिखने जैसी थोड़ी अधिक जिम्मेदारी लेता है। इस तरह के एक कार्यकर्ता के रूप में जाना जाता है chief कार्यकर्ता, और यह प्रथागत है कि worker के साथ index 0 प्रमुख के रूप में नियुक्त किया जाता है worker (वास्तव में यह कैसे है tf.distribute.Strategy कार्यान्वित किया जाता है)।

  • task वर्तमान कार्य की जानकारी प्रदान करता है और प्रत्येक कार्यकर्ता पर अलग होता है। यह उस कार्यकर्ता के type और index को निर्दिष्ट करता है।

इस उदाहरण में, आप कार्य type को "worker" और टास्क index को 0 । यह मशीन पहली कार्यकर्ता है और मुख्य कार्यकर्ता के रूप में नियुक्त की जाएगी और दूसरों की तुलना में अधिक काम करेगी। ध्यान दें कि अन्य मशीनों के लिए TF_CONFIG पर्यावरण चर के रूप में अच्छी तरह से सेट करने की आवश्यकता होगी, और इसमें एक ही cluster TF_CONFIG होना चाहिए, लेकिन उन मशीनों की भूमिका क्या है, इसके आधार पर अलग-अलग कार्य type या कार्य index

चित्रण प्रयोजनों के लिए, यह ट्यूटोरियल दिखाता है कि कोई localhost पर 2 श्रमिकों के साथ TF_CONFIG कैसे सेट कर सकता है। व्यवहार में, उपयोगकर्ता बाहरी आईपी पते / बंदरगाहों पर कई श्रमिकों का निर्माण करेंगे, और उचित रूप से प्रत्येक कार्यकर्ता पर TF_CONFIG सेट TF_CONFIG

इस उदाहरण में आप 2 श्रमिकों का उपयोग करेंगे, पहले कार्यकर्ता का TF_CONFIG ऊपर दिखाया गया है। दूसरे कार्यकर्ता के लिए आप tf_config['task']['index']=1 सेट करेंगे

ऊपर, tf_config अजगर में सिर्फ एक स्थानीय चर है। प्रशिक्षण को कॉन्फ़िगर करने के लिए वास्तव में इसका उपयोग करने के लिए, इस शब्दकोश को JSON के रूप में क्रमबद्ध किया जाना चाहिए, और TF_CONFIG वातावरण चर में रखा TF_CONFIG चाहिए।

नोटबुक में पर्यावरण चर और उपप्रकार

उपप्रजाति अपने माता-पिता से पर्यावरण चर प्राप्त करते हैं। तो अगर आप इस jupyter notebook प्रक्रिया में एक पर्यावरण चर सेट करते हैं:

os.environ['GREETINGS'] = 'Hello TensorFlow!'

आप एक उपप्रकार से पर्यावरण चर का उपयोग कर सकते हैं:

echo ${GREETINGS}
Hello TensorFlow!

अगले भाग में, आप इसका उपयोग कार्यकर्ता TF_CONFIG को TF_CONFIG पास करने के लिए TF_CONFIG । आप वास्तव में कभी भी इस तरह से अपनी नौकरियों का शुभारंभ नहीं करेंगे, लेकिन यह इस ट्यूटोरियल के उद्देश्यों के लिए पर्याप्त है: न्यूनतम बहु-कार्यकर्ता उदाहरण प्रदर्शित करने के लिए।

सही रणनीति चुनें

TensorFlow में वितरित प्रशिक्षण के दो मुख्य रूप हैं:

  • तुल्यकालिक प्रशिक्षण, जहां प्रशिक्षण के चरण श्रमिकों और प्रतिकृतियों में समन्वयित होते हैं, और
  • अतुल्यकालिक प्रशिक्षण, जहां प्रशिक्षण कदम कड़ाई से समन्वयित नहीं होते हैं।

MultiWorkerMirroredStrategy , जो तुल्यकालिक बहु-कार्यकर्ता प्रशिक्षण के लिए अनुशंसित रणनीति है, इस गाइड में प्रदर्शित किया जाएगा। मॉडल को प्रशिक्षित करने के लिए, tf.distribute.MultiWorkerMirroredStrategy का उपयोग करें।

MultiWorkerMirroredStrategy मॉडल के लेयर्स में सभी वेरिएबल्स की कॉपियों को सभी MultiWorkerMirroredStrategy पर प्रत्येक डिवाइस पर बनाता है। यह का उपयोग करता है CollectiveOps , सामूहिक संचार के लिए एक TensorFlow सेशन, कुल ढ़ाल के लिए और सिंक में चर रखने के लिए। tf.distribute.Strategy मार्गदर्शिका में इस रणनीति के बारे में अधिक विवरण हैं।

strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/device:CPU:0',)
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO

MultiWorkerMirroredStrategy CommunicationOptions पैरामीटर के माध्यम से कई कार्यान्वयन प्रदान करता है। RING आर-आधारित सामूहिक को जीआरपीसी का उपयोग करके क्रॉस-होस्ट संचार परत के रूप में लागू करता है। NCCL सामूहिकता को लागू करने के लिए एनवीडिया के एनसीसीएल का उपयोग करता है। AUTO रनटाइम के लिए पसंद को धता बताता है। सामूहिक कार्यान्वयन का सबसे अच्छा विकल्प जीपीयू की संख्या और प्रकार पर निर्भर करता है, और नेटवर्क क्लस्टर में इंटरकनेक्ट करता है।

मॉडल को प्रशिक्षित करें

के एकीकरण के साथ tf.distribute.Strategy में एपीआई tf.keras , केवल बदल आप वितरित करने के लिए एक से अधिक कार्यकर्ताओं को प्रशिक्षण मॉडल के निर्माण और संलग्न है कर देगा model.compile() कॉल अंदर strategy.scope() । वितरण रणनीति का दायरा निर्धारित करता है कि चर कैसे और कहाँ बनाए जाते हैं, और MultiWorkerMirroredStrategy के मामले में, बनाए गए चर MirroredVariable किए गए वैरिएबल एस हैं, और उन्हें प्रत्येक कार्यकर्ता पर दोहराया जाता है।

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist.build_and_compile_cnn_model()

वास्तव में MultiWorkerMirroredStrategy साथ MultiWorkerMirroredStrategy आपको कार्यकर्ता प्रक्रियाओं को चलाने और उनके लिए TF_CONFIG पास करने की आवश्यकता होगी।

पहले लिखी गई mnist.py फ़ाइल की तरह, यहां मुख्य main.py जो प्रत्येक कार्यकर्ता चलाएगा:

%%writefile main.py

import os
import json

import tensorflow as tf
import mnist

per_worker_batch_size = 64
tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])

strategy = tf.distribute.MultiWorkerMirroredStrategy()

global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist.mnist_dataset(global_batch_size)

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist.build_and_compile_cnn_model()


multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)
Writing main.py

ऊपर दिए गए कोड स्निपेट में ध्यान दें कि global_batch_size , जो Dataset.batch को जाता है, per_worker_batch_size * num_workers सेट per_worker_batch_size * num_workers । यह सुनिश्चित करता है कि प्रत्येक कार्यकर्ता श्रमिकों की संख्या की परवाह किए बिना per_worker_batch_size बैचों की प्रक्रिया करता है।

वर्तमान निर्देशिका में अब दोनों पायथन फाइलें हैं:

0a66e310
main.py
mnist.py

इसलिए json- TF_CONFIG क्रमबद्ध करें और इसे पर्यावरण चर में जोड़ें:

os.environ['TF_CONFIG'] = json.dumps(tf_config)

अब, आप एक कार्यकर्ता प्रक्रिया शुरू कर सकते हैं जो main.py और main.py का उपयोग TF_CONFIG :

# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log

उपरोक्त आदेश के बारे में ध्यान देने योग्य बातें हैं:

  1. यह %%bash का उपयोग करता है जो कि कुछ bash कमांड चलाने के लिए एक नोटबुक "मैजिक" है।
  2. यह पृष्ठभूमि में bash प्रक्रिया को चलाने के लिए --bg झंडा का उपयोग करता है, क्योंकि यह कार्यकर्ता समाप्त नहीं करेगा। यह शुरू होने से पहले सभी श्रमिकों की प्रतीक्षा करता है।

पृष्ठभूमि वाली वर्कर प्रक्रिया इस नोटबुक में आउटपुट प्रिंट नहीं करेगी, इसलिए &> एक फ़ाइल में इसके आउटपुट को रीडायरेक्ट करता है, इसलिए आप देख सकते हैं कि क्या हुआ।

इसलिए, प्रक्रिया शुरू होने के लिए कुछ सेकंड प्रतीक्षा करें:

import time
time.sleep(10)

अब देखें कि कार्यकर्ता के लॉगफ़ाइल में अब तक क्या उत्पादन हुआ है:

cat job_0.log
2021-02-23 02:20:33.706454: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
2021-02-23 02:20:35.270749: I tensorflow/compiler/jit/xla_cpu_device.cc:41] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-02-23 02:20:35.271660: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcuda.so.1
2021-02-23 02:20:36.222960: E tensorflow/stream_executor/cuda/cuda_driver.cc:328] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-02-23 02:20:36.223030: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-692030523
2021-02-23 02:20:36.223039: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-692030523
2021-02-23 02:20:36.223151: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 450.51.5
2021-02-23 02:20:36.223184: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 450.51.5
2021-02-23 02:20:36.223191: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 450.51.5
2021-02-23 02:20:36.224026: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-02-23 02:20:36.224371: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-02-23 02:20:36.224902: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-02-23 02:20:36.228825: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:301] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-02-23 02:20:36.229255: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:411] Started server with target: grpc://localhost:12345

लॉग फ़ाइल की अंतिम पंक्ति को कहना चाहिए: Started server with target: grpc://localhost:12345 । पहला कार्यकर्ता अब तैयार है, और आगे बढ़ने के लिए अन्य सभी कार्यकर्ता (कर्मचारी) के तैयार होने की प्रतीक्षा कर रहा है।

तो tf_config के लिए दूसरे कार्यकर्ता की प्रक्रिया के लिए tf_config अपडेट करें:

tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

अब दूसरा कार्यकर्ता लॉन्च करें। यह प्रशिक्षण शुरू हो जाएगा क्योंकि सभी कार्यकर्ता सक्रिय हैं (इसलिए इस प्रक्रिया की पृष्ठभूमि की कोई आवश्यकता नहीं है):

python main.py
Epoch 1/3
70/70 [==============================] - 6s 51ms/step - loss: 2.2830 - accuracy: 0.1437
Epoch 2/3
70/70 [==============================] - 3s 48ms/step - loss: 2.2478 - accuracy: 0.2122
Epoch 3/3
70/70 [==============================] - 3s 48ms/step - loss: 2.2050 - accuracy: 0.3511
2021-02-23 02:20:43.794926: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
2021-02-23 02:20:45.375845: I tensorflow/compiler/jit/xla_cpu_device.cc:41] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-02-23 02:20:45.376779: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcuda.so.1
2021-02-23 02:20:46.347650: E tensorflow/stream_executor/cuda/cuda_driver.cc:328] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-02-23 02:20:46.347716: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-692030523
2021-02-23 02:20:46.347726: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-692030523
2021-02-23 02:20:46.347847: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 450.51.5
2021-02-23 02:20:46.347887: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 450.51.5
2021-02-23 02:20:46.347898: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 450.51.5
2021-02-23 02:20:46.348715: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-02-23 02:20:46.349096: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-02-23 02:20:46.349700: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-02-23 02:20:46.353497: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:301] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-02-23 02:20:46.353936: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:411] Started server with target: grpc://localhost:23456
2021-02-23 02:20:47.285814: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:656] In AUTO-mode, and switching to DATA-based sharding, instead of FILE-based sharding as we cannot find appropriate reader dataset op(s) to shard. Error: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}

2021-02-23 02:20:47.507974: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:116] None of the MLIR optimization passes are enabled (registered 2)
2021-02-23 02:20:47.508360: I tensorflow/core/platform/profile_utils/cpu_utils.cc:112] CPU Frequency: 2000140000 Hz

यदि आप पहले कार्यकर्ता द्वारा लिखे गए लॉग को फिर से पढ़ते हैं, तो आप देखेंगे कि यह उस मॉडल के प्रशिक्षण में भाग लेता है:

cat job_0.log
2021-02-23 02:20:33.706454: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
2021-02-23 02:20:35.270749: I tensorflow/compiler/jit/xla_cpu_device.cc:41] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-02-23 02:20:35.271660: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcuda.so.1
2021-02-23 02:20:36.222960: E tensorflow/stream_executor/cuda/cuda_driver.cc:328] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-02-23 02:20:36.223030: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-692030523
2021-02-23 02:20:36.223039: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-692030523
2021-02-23 02:20:36.223151: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 450.51.5
2021-02-23 02:20:36.223184: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 450.51.5
2021-02-23 02:20:36.223191: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:310] kernel version seems to match DSO: 450.51.5
2021-02-23 02:20:36.224026: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-02-23 02:20:36.224371: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-02-23 02:20:36.224902: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-02-23 02:20:36.228825: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:301] Initialize GrpcChannelCache for job worker -> {0 -> localhost:12345, 1 -> localhost:23456}
2021-02-23 02:20:36.229255: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:411] Started server with target: grpc://localhost:12345
2021-02-23 02:20:47.286117: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:656] In AUTO-mode, and switching to DATA-based sharding, instead of FILE-based sharding as we cannot find appropriate reader dataset op(s) to shard. Error: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}

2021-02-23 02:20:47.508657: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:116] None of the MLIR optimization passes are enabled (registered 2)
2021-02-23 02:20:47.508964: I tensorflow/core/platform/profile_utils/cpu_utils.cc:112] CPU Frequency: 2000140000 Hz
Epoch 1/3
70/70 [==============================] - 6s 51ms/step - loss: 2.2830 - accuracy: 0.1437
Epoch 2/3
70/70 [==============================] - 3s 48ms/step - loss: 2.2478 - accuracy: 0.2122
Epoch 3/3
70/70 [==============================] - 3s 48ms/step - loss: 2.2050 - accuracy: 0.3511

इस ट्यूटोरियल की शुरुआत में टेस्ट रन की तुलना में यह आश्चर्यजनक रूप से धीमा था । एक मशीन पर कई श्रमिकों को चलाना केवल ओवरहेड जोड़ता है। यहां लक्ष्य प्रशिक्षण के समय में सुधार करना नहीं था, बल्कि केवल बहु-कार्यकर्ता प्रशिक्षण का एक उदाहरण देना था।

# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.

गहराई में बहु कार्यकर्ता प्रशिक्षण

अब तक इस ट्यूटोरियल ने एक बुनियादी बहु-कार्यकर्ता सेटअप का प्रदर्शन किया है। इस दस्तावेज़ के बाकी विस्तार में अन्य कारक हैं जो वास्तविक उपयोग के मामलों के लिए उपयोगी या महत्वपूर्ण हो सकते हैं।

डेटासट की धार

बहु-कार्यकर्ता प्रशिक्षण में, अभिसरण और प्रदर्शन सुनिश्चित करने के लिए डेटासेट शार्पिंग की आवश्यकता होती है।

पिछले अनुभाग में उदाहरण tf.distribute.Strategy API द्वारा प्रदान की गई डिफ़ॉल्ट ऑटोशेयरिंग पर निर्भर करता है। आप की स्थापना करके sharding नियंत्रित कर सकते हैं tf.data.experimental.AutoShardPolicy की tf.data.experimental.DistributeOptions । ऑटो-शेरिंग के बारे में अधिक जानने के लिए डिस्ट्रीब्यूटेड इनपुट गाइड देखें

ऑटो शार्टिंग को कैसे बंद करें, इसका एक त्वरित उदाहरण है, इसलिए प्रत्येक प्रतिकृति प्रत्येक उदाहरण को संसाधित करती है (अनुशंसित नहीं):

options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF

global_batch_size = 64
multi_worker_dataset = mnist.mnist_dataset(batch_size=64)
dataset_no_auto_shard = multi_worker_dataset.with_options(options)

मूल्यांकन

यदि आप validation_data को model.fit में पास करते हैं, तो यह प्रत्येक युग के लिए प्रशिक्षण और मूल्यांकन के बीच वैकल्पिक होगा। मूल्यांकन लेने validation_data सभी श्रमिकों के लिए कार्यकर्ताओं के एक ही सेट भर में वितरित किया जाता है और मूल्यांकन परिणाम एकत्रित कर रहे हैं और उपलब्ध नहीं हैं। प्रशिक्षण के समान, सत्यापन डेटासेट स्वचालित रूप से फ़ाइल स्तर पर तेज होता है। आपको सत्यापन डेटासेट में एक वैश्विक बैच आकार सेट validation_steps और validation_steps सेट validation_steps । मूल्यांकन के लिए एक दोहराया डेटासेट भी अनुशंसित है।

वैकल्पिक रूप से, आप एक और कार्य भी बना सकते हैं जो समय-समय पर चौकियों को पढ़ता है और मूल्यांकन चलाता है। ऐसा एस्टिमेटर करता है। लेकिन यह मूल्यांकन करने का एक अनुशंसित तरीका नहीं है और इस प्रकार इसके विवरण को छोड़ दिया जाता है।

प्रदर्शन

अब आपके पास एक केरस मॉडल है जो MultiWorkerMirroredStrategy साथ कई श्रमिकों में चलाने के लिए MultiWorkerMirroredStrategy । आप MultiWorkerMirroredStrategy ट्रेनिंग के मल्टी-वर्कर प्रशिक्षण के प्रदर्शन को निम्न तकनीकों से MultiWorkerMirroredStrategy

  • MultiWorkerMirroredStrategy कई सामूहिक संचार कार्यान्वयन प्रदान करता है। RING आर-आधारित सामूहिक को जीआरपीसी का उपयोग करके क्रॉस-होस्ट संचार परत के रूप में लागू करता है। NCCL सामूहिकता को लागू करने के लिए एनवीडिया के एनसीसीएल का उपयोग करता है। AUTO रनटाइम के लिए पसंद को धता बताता है। सामूहिक कार्यान्वयन का सबसे अच्छा विकल्प जीपीयू की संख्या और प्रकार पर निर्भर करता है, और नेटवर्क क्लस्टर में इंटरकनेक्ट करता है। स्वचालित पसंद को ओवरराइड करने के लिए, MultiWorkerMirroredStrategy के कंस्ट्रक्टर, जैसे communication_options=tf.distribute.experimental.CommunicationOptions(implementation=tf.distribute.experimental.CollectiveCommunication.NCCL) communication_options पैरामीटर निर्दिष्ट करें।
  • यदि संभव हो तो tf.float लिए चर कास्ट करें। आधिकारिक ResNet मॉडल में यह कैसे किया जा सकता है इसका एक उदाहरण शामिल है।

दोष सहिष्णुता

समकालिक प्रशिक्षण में, क्लस्टर विफल हो जाता है यदि श्रमिकों में से एक विफल रहता है और कोई विफलता-पुनर्प्राप्ति तंत्र मौजूद नहीं है। tf.distribute.Strategy का उपयोग tf.distribute.Strategy के साथ करने से उन मामलों में दोष सहिष्णुता का लाभ मिलता है जहां श्रमिक मर जाते हैं या अन्यथा अस्थिर होते हैं। आप अपनी पसंद की वितरित फ़ाइल प्रणाली में प्रशिक्षण राज्य को संरक्षित करके ऐसा करते हैं, जैसे कि उस उदाहरण के पुनरारंभ होने पर जो पहले विफल हो गया था या पहले से हटा दिया गया था, प्रशिक्षण राज्य पुनर्प्राप्त किया जाता है।

जब एक कार्यकर्ता अनुपलब्ध हो जाता है, तो अन्य श्रमिक विफल हो जाएंगे (संभवतः समय समाप्त होने के बाद)। ऐसे मामलों में, अनुपलब्ध कर्मी को पुनः आरंभ करने की आवश्यकता होती है, साथ ही साथ अन्य कर्मी जो असफल रहे हैं।

ModelCheckpoint कॉलबैक

ModelCheckpoint कॉलबैक अब दोष सहिष्णुता कार्यक्षमता प्रदान नहीं करता है, कृपया इसके बजाय BackupAndRestore कॉलबैक का उपयोग करें।

ModelCheckpoint कॉलबैक का उपयोग अभी भी चौकियों को बचाने के लिए किया जा सकता है। लेकिन इसके साथ, यदि प्रशिक्षण को बाधित या सफलतापूर्वक समाप्त कर दिया गया था, तो चेकपॉइंट से प्रशिक्षण जारी रखने के लिए, उपयोगकर्ता को मैन्युअल रूप से लोड करने के लिए जिम्मेदार है।

वैकल्पिक रूप से उपयोगकर्ता ModelCheckpoint कॉलबैक के बाहर मॉडल / वेट को बचाने और पुनर्स्थापित करने का विकल्प चुन सकता है।

मॉडल की बचत और लोडिंग

अपने मॉडल का उपयोग कर बचाने के लिए model.save या tf.saved_model.save , जरूरतों को बचाने के लिए गंतव्य प्रत्येक कार्यकर्ता के लिए अलग अलग हो सकता है। गैर-मुख्य श्रमिकों पर, आपको मॉडल को एक अस्थायी निर्देशिका में सहेजने की आवश्यकता होगी, और प्रमुख पर, आपको प्रदान की गई मॉडल निर्देशिका को सहेजना होगा। एक ही स्थान पर लिखने की कोशिश कर रहे कई श्रमिकों से उत्पन्न त्रुटियों को रोकने के लिए कार्यकर्ता पर अस्थायी निर्देशिकाओं को अद्वितीय होना चाहिए। सभी निर्देशिकाओं में सहेजा गया मॉडल समान है और आमतौर पर केवल प्रमुख द्वारा सहेजे गए मॉडल को पुनर्स्थापित या सेवा के लिए संदर्भित किया जाना चाहिए। आपके पास कुछ सफाई तर्क होना चाहिए जो आपके प्रशिक्षण पूरा होने के बाद श्रमिकों द्वारा बनाई गई अस्थायी निर्देशिकाओं को हटा दें।

एक ही समय में आपको प्रमुख और श्रमिकों को बचाने की आवश्यकता है, क्योंकि आप चेकपॉइंटिंग के दौरान चर को एकत्र कर सकते हैं, जिसके लिए मुख्य और श्रमिकों दोनों को एलायड संचार प्रोटोकॉल में भाग लेने की आवश्यकता होती है। दूसरी ओर, मुख्य और श्रमिकों को एक ही मॉडल निर्देशिका में सहेजने देने से विवाद के कारण त्रुटियां होंगी।

MultiWorkerMirroredStrategy साथ, कार्यक्रम प्रत्येक कार्यकर्ता पर चलाया जाता है, और यह जानने के लिए कि क्या वर्तमान कार्यकर्ता मुख्य है, यह क्लस्टर रिज़ॉल्वर ऑब्जेक्ट का लाभ उठाता है, जिसमें task_type और task_id विशेषताएँ हैं। task_type आपको बताता है कि वर्तमान नौकरी क्या है (जैसे 'कार्यकर्ता'), और task_id आपको कार्यकर्ता की पहचानकर्ता बताता है। आईडी 0 वाले कार्यकर्ता को मुख्य कार्यकर्ता के रूप में नामित किया गया है।

नीचे दिए गए कोड स्निपेट में, write_filepath लिखने के लिए फ़ाइल पथ प्रदान करता है, जो कि कार्यकर्ता आईडी पर निर्भर करता है। प्रमुख के मामले में (आईडी 0 के साथ कार्यकर्ता), यह मूल फ़ाइल पथ को लिखता है; दूसरों के लिए, यह लिखने के लिए एक अस्थायी निर्देशिका (निर्देशिका पथ में आईडी के साथ) बनाता है:

model_path = '/tmp/keras-model'

def _is_chief(task_type, task_id):
  # Note: there are two possible `TF_CONFIG` configuration.
  #   1) In addition to `worker` tasks, a `chief` task type is use;
  #      in this case, this function should be modified to 
  #      `return task_type == 'chief'`.
  #   2) Only `worker` task type is used; in this case, worker 0 is
  #      regarded as the chief. The implementation demonstrated here
  #      is for this case.
  # For the purpose of this colab section, we also add `task_type is None` 
  # case because it is effectively run with only single worker.
  return (task_type == 'worker' and task_id == 0) or task_type is None

def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)
write_model_path = write_filepath(model_path, task_type, task_id)

इसके साथ, आप अब सहेजने के लिए तैयार हैं:

multi_worker_model.save(write_model_path)
INFO:tensorflow:Assets written to: /tmp/keras-model/assets
INFO:tensorflow:Assets written to: /tmp/keras-model/assets

जैसा कि ऊपर वर्णित है, बाद में मॉडल को केवल सहेजे गए पथ प्रमुख से लोड किया जाना चाहिए, तो चलिए गैर-प्रमुख श्रमिकों को अस्थायी हटा दें:

if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(os.path.dirname(write_model_path))

अब, जब यह लोड करने का समय है, तो सुविधाजनक tf.keras.models.load_model API का उपयोग करें, और आगे के काम के साथ जारी रखें। यहां, प्रशिक्षण और लोड जारी रखने के लिए केवल एकल कार्यकर्ता का उपयोग करने के लिए मान लें, जिस स्थिति में आप किसी अन्य strategy.scope() भीतर tf.keras.models.load_model नहीं कहते हैं। strategy.scope()

loaded_model = tf.keras.models.load_model(model_path)

# Now that the model is restored, and can continue with the training.
loaded_model.fit(single_worker_dataset, epochs=2, steps_per_epoch=20)
Epoch 1/2
20/20 [==============================] - 1s 13ms/step - loss: 2.3041 - accuracy: 7.8125e-04
Epoch 2/2
20/20 [==============================] - 0s 12ms/step - loss: 2.2873 - accuracy: 0.0023
<tensorflow.python.keras.callbacks.History at 0x7f62c4ef5048>

चेकपॉइंट की बचत और पुनर्स्थापना

दूसरी ओर, चेकपॉइंटिंग आपको मॉडल के वजन को बचाने और पूरे मॉडल को बचाने के बिना उन्हें पुनर्स्थापित करने की अनुमति देता है। यहां, आप एक tf.train.Checkpoint मॉडल तैयार करेंगे, जिसे tf.train.CheckpointManager द्वारा प्रबंधित किया जाता है, ताकि केवल नवीनतम चेकपॉइंट संरक्षित रहे।

checkpoint_dir = '/tmp/ckpt'

checkpoint = tf.train.Checkpoint(model=multi_worker_model)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id)
checkpoint_manager = tf.train.CheckpointManager(
    checkpoint, directory=write_checkpoint_dir, max_to_keep=1)

एक बार CheckpointManager सेट हो जाने के बाद, आप अब सहेजे गए चौकियों को बचाने और हटाने के लिए तैयार हैं, जो गैर-मुख्य श्रमिकों को बचाए हुए हैं।

checkpoint_manager.save()
if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(write_checkpoint_dir)

अब, जब आपको पुनर्स्थापित करने की आवश्यकता होती है, तो आप सुविधाजनक tf.train.latest_checkpoint फ़ंक्शन का उपयोग करके सहेजे गए नवीनतम चेकपॉइंट पा सकते हैं। चौकी को बहाल करने के बाद, आप प्रशिक्षण के साथ जारी रख सकते हैं।

latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
checkpoint.restore(latest_checkpoint)
multi_worker_model.fit(multi_worker_dataset, epochs=2, steps_per_epoch=20)
Epoch 1/2
20/20 [==============================] - 3s 13ms/step - loss: 2.3050 - accuracy: 0.0920
Epoch 2/2
20/20 [==============================] - 0s 12ms/step - loss: 2.2963 - accuracy: 0.0896
<tensorflow.python.keras.callbacks.History at 0x7f62c44a2710>

BackupAndRestore कॉलबैक

BackupAndRestore कॉलबैक के तहत मॉडल और एक अस्थायी चौकी फ़ाइल में वर्तमान युग संख्या का बैकअप लेने से, दोष सहिष्णुता सुविधा प्रदान करता है backup_dir को तर्क BackupAndRestore । यह प्रत्येक युग के अंत में किया जाता है।

एक बार नौकरी बाधित होने और फिर से शुरू होने के बाद, कॉलबैक अंतिम चेकपॉइंट को पुनर्स्थापित करता है, और बाधित युग की शुरुआत से प्रशिक्षण जारी रहता है। किसी भी आंशिक प्रशिक्षण को पहले से ही अधूरा युग में बिना किसी रुकावट के पहले किया जाता है, ताकि इसे अंतिम मॉडल स्थिति को प्रभावित न किया जाए।

इसका उपयोग करने के लिए, tf.keras.callbacks.experimental.BackupAndRestore का एक उदाहरण tf.keras.Model.fit() कॉल करें।

MultiWorkerMirroredStrategy के साथ, यदि कोई कार्यकर्ता बाधित हो जाता है, तो बाधित कार्यकर्ता के पुनरारंभ होने तक पूरा क्लस्टर रुक जाता है। अन्य कार्यकर्ता भी पुनरारंभ करेंगे, और बाधित कार्यकर्ता क्लस्टर में शामिल हो जाता है। फिर, प्रत्येक कार्यकर्ता चेकपॉइंट फ़ाइल को पढ़ता है जो पहले सहेजी गई थी और अपनी पूर्व स्थिति को चुनती है, जिससे क्लस्टर सिंक में वापस आ सके। फिर प्रशिक्षण जारी है।

BackupAndRestore कॉलबैक, प्रशिक्षण स्थिति को बचाने और पुनर्स्थापित करने के लिए CheckpointManager का उपयोग करता है, जो चेकपॉइंट नामक एक फ़ाइल उत्पन्न करता है जो नवीनतम के साथ मौजूदा चौकियों को ट्रैक करता है। इस कारण से, backup_dir को नाम की टक्कर से बचने के लिए अन्य चौकियों को संग्रहीत करने के लिए फिर से उपयोग नहीं किया जाना चाहिए।

वर्तमान में, BackupAndRestore कॉलबैक सिंगल वर्कर को बिना किसी रणनीति के, मिररडस्ट्रोस्ट्रैटी और मल्टी-वर्कर के साथ MultiWorkerMirroredStrategy का समर्थन करता है। नीचे बहु-कार्यकर्ता प्रशिक्षण और एकल कार्यकर्ता प्रशिक्षण दोनों के लिए दो उदाहरण हैं।

# Multi-worker training with MultiWorkerMirroredStrategy.

callbacks = [tf.keras.callbacks.experimental.BackupAndRestore(backup_dir='/tmp/backup')]
with strategy.scope():
  multi_worker_model = mnist.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
                       epochs=3,
                       steps_per_epoch=70,
                       callbacks=callbacks)
Epoch 1/3
70/70 [==============================] - 4s 13ms/step - loss: 2.2930 - accuracy: 0.1316
Epoch 2/3
70/70 [==============================] - 1s 13ms/step - loss: 2.2467 - accuracy: 0.2765
Epoch 3/3
70/70 [==============================] - 1s 13ms/step - loss: 2.1963 - accuracy: 0.3645
<tensorflow.python.keras.callbacks.History at 0x7f62c4371390>

यदि आप backup_dir की निर्देशिका का निरीक्षण करते हैं, जो आपने BackupAndRestore में निर्दिष्ट किया है, तो आप कुछ अस्थायी रूप से उत्पन्न चेकपॉइंट फ़ाइलों को देख सकते हैं। उन फ़ाइलों को पहले खोए हुए उदाहरणों को पुनर्प्राप्त करने के लिए आवश्यक है, और उन्हें आपके प्रशिक्षण के सफल निकास पर tf.keras.Model.fit() के अंत में लाइब्रेरी द्वारा हटा दिया जाएगा।

यह सभी देखें

  1. TensorFlow गाइड में वितरित प्रशिक्षण उपलब्ध वितरण रणनीतियों का अवलोकन प्रदान करता है।
  2. आधिकारिक मॉडल , जिनमें से कई को कई वितरण रणनीतियों को चलाने के लिए कॉन्फ़िगर किया जा सकता है।
  3. गाइड में प्रदर्शन अनुभाग अन्य रणनीतियों और उपकरणों के बारे में जानकारी प्रदान करता है जिनका उपयोग आप अपने TensorFlow मॉडल के प्रदर्शन को अनुकूलित करने के लिए कर सकते हैं।