Google I/O से कीनोट, उत्पाद सत्र, कार्यशालाएं और बहुत कुछ देखें प्लेलिस्ट देखें

tf.distribute.Strategy . के साथ कस्टम प्रशिक्षण

TensorFlow.org पर देखें Google Colab में चलाएं GitHub पर स्रोत देखें नोटबुक डाउनलोड करें

यह ट्यूटोरियल दर्शाता है कि कस्टम प्रशिक्षण लूप के साथ tf.distribute.Strategy का उपयोग कैसे करें। हम फैशन MNIST डेटासेट पर एक साधारण CNN मॉडल को प्रशिक्षित करेंगे। फैशन एमएनआईएसटी डेटासेट में आकार 28 x 28 की 60000 ट्रेन छवियां और आकार 28 x 28 की 10000 परीक्षण छवियां शामिल हैं।

हम अपने मॉडल को प्रशिक्षित करने के लिए कस्टम प्रशिक्षण लूप का उपयोग कर रहे हैं क्योंकि वे हमें लचीलापन और प्रशिक्षण पर अधिक नियंत्रण प्रदान करते हैं। इसके अलावा, मॉडल और प्रशिक्षण लूप को डीबग करना आसान है।

# Import TensorFlow
import tensorflow as tf

# Helper libraries
import numpy as np
import os

print(tf.__version__)
2.5.0

फैशन एमएनआईएसटी डेटासेट डाउनलोड करें

fashion_mnist = tf.keras.datasets.fashion_mnist

(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()

# Adding a dimension to the array -> new shape == (28, 28, 1)
# We are doing this because the first layer in our model is a convolutional
# layer and it requires a 4D input (batch_size, height, width, channels).
# batch_size dimension will be added later on.
train_images = train_images[..., None]
test_images = test_images[..., None]

# Getting the images in [0, 1] range.
train_images = train_images / np.float32(255)
test_images = test_images / np.float32(255)

चर और ग्राफ़ वितरित करने के लिए एक रणनीति बनाएं Create

tf.distribute.MirroredStrategy रणनीति कैसे काम करती है?

  • सभी चर और मॉडल ग्राफ को प्रतिकृतियों पर दोहराया गया है।
  • इनपुट समान रूप से प्रतिकृतियों में वितरित किया जाता है।
  • प्रत्येक प्रतिकृति प्राप्त इनपुट के लिए नुकसान और ग्रेडिएंट की गणना करती है।
  • ग्रेडिएंट्स को सभी प्रतिकृतियों में समेट कर समन्‍वयित किया जाता है।
  • सिंक के बाद, प्रत्येक प्रतिकृति पर चर की प्रतियों के लिए एक ही अद्यतन किया जाता है।
# If the list of devices is not specified in the
# `tf.distribute.MirroredStrategy` constructor, it will be auto-detected.
strategy = tf.distribute.MirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
print ('Number of devices: {}'.format(strategy.num_replicas_in_sync))
Number of devices: 1

सेटअप इनपुट पाइपलाइन

प्लेटफ़ॉर्म-अज्ञेयवादी सहेजे गए मॉडल प्रारूप में ग्राफ़ और चर निर्यात करें। आपका मॉडल सहेजे जाने के बाद, आप इसे स्कोप के साथ या उसके बिना लोड कर सकते हैं।

BUFFER_SIZE = len(train_images)

BATCH_SIZE_PER_REPLICA = 64
GLOBAL_BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync

EPOCHS = 10

डेटासेट बनाएं और उन्हें वितरित करें:

train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels)).shuffle(BUFFER_SIZE).batch(GLOBAL_BATCH_SIZE) 
test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE) 

train_dist_dataset = strategy.experimental_distribute_dataset(train_dataset)
test_dist_dataset = strategy.experimental_distribute_dataset(test_dataset)

मॉडल बनाएं

tf.keras.Sequential का उपयोग करके एक मॉडल बनाएं। ऐसा करने के लिए आप मॉडल सबक्लासिंग एपीआई का भी उपयोग कर सकते हैं।

०७४०००७१६००८४af२७८९०

हानि फ़ंक्शन को परिभाषित करें

आम तौर पर, 1 GPU/CPU वाली एकल मशीन पर, हानि को इनपुट के बैच में उदाहरणों की संख्या से विभाजित किया जाता है।

तो, tf.distribute.Strategy का उपयोग करते समय हानि की गणना कैसे की जानी चाहिए?

  • उदाहरण के लिए, मान लें कि आपके पास 4 GPU है और 64 का बैच आकार है। इनपुट का एक बैच प्रतिकृतियों (4 GPU) में वितरित किया जाता है, प्रत्येक प्रतिकृति को आकार 16 का इनपुट मिलता है।

  • प्रत्येक प्रतिकृति पर मॉडल अपने संबंधित इनपुट के साथ फॉरवर्ड पास करता है और नुकसान की गणना करता है। अब, नुकसान को उसके संबंधित इनपुट (BATCH_SIZE_PER_REPLICA = 16) में उदाहरणों की संख्या से विभाजित करने के बजाय, नुकसान को GLOBAL_BATCH_SIZE (64) से विभाजित किया जाना चाहिए।

यह क्यों?

  • ऐसा करने की आवश्यकता है क्योंकि प्रत्येक प्रतिकृति पर ग्रेडियेंट की गणना के बाद, उन्हें संक्षेप में प्रतिकृतियों में समन्वयित किया जाता है।

TensorFlow में यह कैसे करें?

  • यदि आप एक कस्टम प्रशिक्षण लूप लिख रहे हैं, जैसा कि इस ट्यूटोरियल में है, तो आपको प्रति उदाहरण हानियों का योग करना चाहिए और योग को GLOBAL_BATCH_SIZE से विभाजित करना चाहिए: scale_loss = tf.reduce_sum(loss) * (1. / GLOBAL_BATCH_SIZE) या आप tf.nn.compute_average_loss उपयोग कर सकते हैं tf.nn.compute_average_loss जो प्रति उदाहरण हानि, वैकल्पिक नमूना भार, और GLOBAL_BATCH_SIZE को तर्क के रूप में लेता है और स्केल की गई हानि लौटाता है।

  • यदि आप अपने मॉडल में नियमितीकरण के नुकसान का उपयोग कर रहे हैं तो आपको नुकसान के मूल्य को प्रतिकृतियों की संख्या से मापना होगा। आप इसे tf.nn.scale_regularization_loss फ़ंक्शन का उपयोग करके कर सकते हैं।

  • tf.reduce_mean का उपयोग करने की अनुशंसा नहीं की जाती है। ऐसा करने से नुकसान वास्तविक प्रति प्रतिकृति बैच आकार से विभाजित हो जाता है जो चरण दर चरण भिन्न हो सकता है।

  • यह कमी और स्केलिंग keras में स्वचालित रूप से किया जाता है model.compile और model.fit

  • यदिtf.keras.losses वर्गों का उपयोग करtf.keras.losses हैं (जैसा कि नीचे दिए गए उदाहरण में है), हानि में कमी को स्पष्ट रूप से NONE या SUM एक होने के लिए निर्दिष्ट करने की आवश्यकता है। tf.distribute.Strategy . tf.distribute.Strategy साथ उपयोग किए जाने पर AUTO और SUM_OVER_BATCH_SIZE की अनुमति नहीं है। AUTO की अनुमति नहीं है क्योंकि उपयोगकर्ता को स्पष्ट रूप से सोचना चाहिए कि वे किस कमी को सुनिश्चित करना चाहते हैं कि यह वितरित मामले में सही है। SUM_OVER_BATCH_SIZE की अनुमति नहीं है क्योंकि वर्तमान में यह केवल प्रति प्रतिकृति बैच आकार से विभाजित होगा, और उपयोगकर्ता को प्रतिकृतियों की संख्या से विभाजित करना छोड़ देगा, जिसे याद करना आसान हो सकता है। इसलिए इसके बजाय हम उपयोगकर्ता से स्पष्ट रूप से कटौती करने के लिए कहते हैं।

  • यदि labels बहु-आयामी हैं, तो प्रत्येक नमूने में तत्वों की संख्या में per_example_loss औसत करें। उदाहरण के लिए, यदि predictions का आकार (batch_size, H, W, n_classes) और labels (batch_size, H, W) (batch_size, H, W, n_classes) (batch_size, H, W) , तो आपको per_example_loss को अपडेट करना per_example_loss जैसे: per_example_loss /= tf.cast(tf.reduce_prod(tf.shape(labels)[1:]), tf.float32)

with strategy.scope():
  # Set reduction to `none` so we can do the reduction afterwards and divide by
  # global batch size.
  loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
      from_logits=True,
      reduction=tf.keras.losses.Reduction.NONE)
  def compute_loss(labels, predictions):
    per_example_loss = loss_object(labels, predictions)
    return tf.nn.compute_average_loss(per_example_loss, global_batch_size=GLOBAL_BATCH_SIZE)

हानि और सटीकता को ट्रैक करने के लिए मेट्रिक्स को परिभाषित करें

ये मेट्रिक्स परीक्षण हानि और प्रशिक्षण और परीक्षण सटीकता को ट्रैक करते हैं। आप किसी भी समय संचित आंकड़े प्राप्त करने के लिए .result() का उपयोग कर सकते हैं।

with strategy.scope():
  test_loss = tf.keras.metrics.Mean(name='test_loss')

  train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='train_accuracy')
  test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='test_accuracy')
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).

प्रशिक्षण पाश

# model, optimizer, and checkpoint must be created under `strategy.scope`.
with strategy.scope():
  model = create_model()

  optimizer = tf.keras.optimizers.Adam()

  checkpoint = tf.train.Checkpoint(optimizer=optimizer, model=model)
def train_step(inputs):
  images, labels = inputs

  with tf.GradientTape() as tape:
    predictions = model(images, training=True)
    loss = compute_loss(labels, predictions)

  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))

  train_accuracy.update_state(labels, predictions)
  return loss 

def test_step(inputs):
  images, labels = inputs

  predictions = model(images, training=False)
  t_loss = loss_object(labels, predictions)

  test_loss.update_state(t_loss)
  test_accuracy.update_state(labels, predictions)
# `run` replicates the provided computation and runs it
# with the distributed input.
@tf.function
def distributed_train_step(dataset_inputs):
  per_replica_losses = strategy.run(train_step, args=(dataset_inputs,))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,
                         axis=None)

@tf.function
def distributed_test_step(dataset_inputs):
  return strategy.run(test_step, args=(dataset_inputs,))

for epoch in range(EPOCHS):
  # TRAIN LOOP
  total_loss = 0.0
  num_batches = 0
  for x in train_dist_dataset:
    total_loss += distributed_train_step(x)
    num_batches += 1
  train_loss = total_loss / num_batches

  # TEST LOOP
  for x in test_dist_dataset:
    distributed_test_step(x)

  if epoch % 2 == 0:
    checkpoint.save(checkpoint_prefix)

  template = ("Epoch {}, Loss: {}, Accuracy: {}, Test Loss: {}, "
              "Test Accuracy: {}")
  print (template.format(epoch+1, train_loss,
                         train_accuracy.result()*100, test_loss.result(),
                         test_accuracy.result()*100))

  test_loss.reset_states()
  train_accuracy.reset_states()
  test_accuracy.reset_states()
Epoch 1, Loss: 0.5080616474151611, Accuracy: 81.83833312988281, Test Loss: 0.3824280798435211, Test Accuracy: 86.6199951171875
Epoch 2, Loss: 0.3330748677253723, Accuracy: 87.9949951171875, Test Loss: 0.3359815776348114, Test Accuracy: 87.95999908447266
Epoch 3, Loss: 0.28687483072280884, Accuracy: 89.54666900634766, Test Loss: 0.30980998277664185, Test Accuracy: 89.04000091552734
Epoch 4, Loss: 0.2584429383277893, Accuracy: 90.50499725341797, Test Loss: 0.2756710350513458, Test Accuracy: 90.02999877929688
Epoch 5, Loss: 0.23276594281196594, Accuracy: 91.52833557128906, Test Loss: 0.2794630527496338, Test Accuracy: 89.8800048828125
Epoch 6, Loss: 0.2117633819580078, Accuracy: 92.18333435058594, Test Loss: 0.2680763602256775, Test Accuracy: 90.2300033569336
Epoch 7, Loss: 0.19373847544193268, Accuracy: 92.80500030517578, Test Loss: 0.26665198802948, Test Accuracy: 90.24000549316406
Epoch 8, Loss: 0.17856450378894806, Accuracy: 93.42500305175781, Test Loss: 0.2541039288043976, Test Accuracy: 91.04000091552734
Epoch 9, Loss: 0.16165423393249512, Accuracy: 94.06999969482422, Test Loss: 0.2580500841140747, Test Accuracy: 90.77999877929688
Epoch 10, Loss: 0.15017499029636383, Accuracy: 94.3566665649414, Test Loss: 0.2751633822917938, Test Accuracy: 90.47000122070312

ऊपर के उदाहरण में ध्यान देने योग्य बातें:

  • हम for x in ... निर्माण के for x in ... a का उपयोग करके train_dist_dataset और test_dist_dataset पर पुनरावृति कर रहे हैं।
  • स्केल्ड लॉस distributed_train_step का रिटर्न वैल्यू है। यह मान tf.distribute.Strategy.reduce कॉल का उपयोग करके और फिर tf.distribute.Strategy.reduce कॉल के रिटर्न मान को tf.distribute.Strategy.reduce सभी प्रतिकृतियों में एकत्रित किया tf.distribute.Strategy.reduce है।
  • tf.keras.Metrics को train_step और test_step अंदर अपडेट किया जाना चाहिए जो tf.distribute.Strategy.run द्वारा निष्पादित हो जाता है। * tf.distribute.Strategy.run रणनीति में प्रत्येक स्थानीय प्रतिकृति से परिणाम देता है, और इस परिणाम का उपभोग करने के कई तरीके हैं। आप एक समेकित मूल्य प्राप्त करने के लिए tf.distribute.Strategy.reduce कर सकते हैं। आप परिणाम में निहित मानों की सूची प्राप्त करने के लिए tf.distribute.Strategy.experimental_local_results भी कर सकते हैं, एक प्रति स्थानीय प्रतिकृति।

नवीनतम चेकपॉइंट को पुनर्स्थापित करें और परीक्षण करें

tf.distribute.Strategy साथ चेकपॉइंट किया गया एक मॉडल रणनीति के साथ या उसके बिना बहाल किया जा सकता है।

eval_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='eval_accuracy')

new_model = create_model()
new_optimizer = tf.keras.optimizers.Adam()

test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE)
@tf.function
def eval_step(images, labels):
  predictions = new_model(images, training=False)
  eval_accuracy(labels, predictions)
checkpoint = tf.train.Checkpoint(optimizer=new_optimizer, model=new_model)
checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))

for images, labels in test_dataset:
  eval_step(images, labels)

print ('Accuracy after restoring the saved model without strategy: {}'.format(
    eval_accuracy.result()*100))
Accuracy after restoring the saved model without strategy: 90.77999877929688

डेटासेट पर पुनरावृति के वैकल्पिक तरीके

इटरेटर्स का उपयोग करना

आप पूरे डाटासेट के माध्यम से नहीं चरणों की दी गई संख्या और अधिक पुनरावृति करना चाहते हैं आप एक इटरेटर का उपयोग कर बना सकते हैं iter कॉल और स्पष्ट रूप से कॉल next पुनरावर्तक पर। आप tf.function के अंदर और बाहर डेटासेट पर पुनरावृति करना चुन सकते हैं। यहाँ एक छोटा सा स्निपेट है जो एक पुनरावर्तक का उपयोग करके tf.function के बाहर डेटासेट के पुनरावृत्ति को प्रदर्शित करता है।

for _ in range(EPOCHS):
  total_loss = 0.0
  num_batches = 0
  train_iter = iter(train_dist_dataset)

  for _ in range(10):
    total_loss += distributed_train_step(next(train_iter))
    num_batches += 1
  average_train_loss = total_loss / num_batches

  template = ("Epoch {}, Loss: {}, Accuracy: {}")
  print (template.format(epoch+1, average_train_loss, train_accuracy.result()*100))
  train_accuracy.reset_states()
Epoch 10, Loss: 0.14204928278923035, Accuracy: 94.53125
Epoch 10, Loss: 0.12290157377719879, Accuracy: 95.78125
Epoch 10, Loss: 0.13413389027118683, Accuracy: 95.78125
Epoch 10, Loss: 0.14106282591819763, Accuracy: 95.0
Epoch 10, Loss: 0.13664354383945465, Accuracy: 95.15625
Epoch 10, Loss: 0.14284934103488922, Accuracy: 94.6875
Epoch 10, Loss: 0.1361488401889801, Accuracy: 95.15625
Epoch 10, Loss: 0.13652849197387695, Accuracy: 94.53125
Epoch 10, Loss: 0.10971339046955109, Accuracy: 96.09375
Epoch 10, Loss: 0.1285729855298996, Accuracy: 95.3125

एक tf.function के अंदर पुनरावृति

आप train_dist_dataset अंदर for x in ... निर्माण का उपयोग करके या ऊपर दिए गए train_dist_dataset संपूर्ण इनपुट train_dist_dataset पर पुनरावृति भी कर सकते हैं। नीचे दिया गया उदाहरण एक tf.function में प्रशिक्षण के एक युग को लपेटने और फ़ंक्शन के अंदर train_dist_dataset पर पुनरावृति train_dist_dataset है।

@tf.function
def distributed_train_epoch(dataset):
  total_loss = 0.0
  num_batches = 0
  for x in dataset:
    per_replica_losses = strategy.run(train_step, args=(x,))
    total_loss += strategy.reduce(
      tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
    num_batches += 1
  return total_loss / tf.cast(num_batches, dtype=tf.float32)

for epoch in range(EPOCHS):
  train_loss = distributed_train_epoch(train_dist_dataset)

  template = ("Epoch {}, Loss: {}, Accuracy: {}")
  print (template.format(epoch+1, train_loss, train_accuracy.result()*100))

  train_accuracy.reset_states()
Epoch 1, Loss: 0.13583378493785858, Accuracy: 94.90333557128906
Epoch 2, Loss: 0.12437015771865845, Accuracy: 95.38833618164062
Epoch 3, Loss: 0.11175186187028885, Accuracy: 95.82499694824219
Epoch 4, Loss: 0.10018172115087509, Accuracy: 96.25666809082031
Epoch 5, Loss: 0.09223354607820511, Accuracy: 96.55999755859375
Epoch 6, Loss: 0.08545086532831192, Accuracy: 96.84166717529297
Epoch 7, Loss: 0.07955380529165268, Accuracy: 97.03666687011719
Epoch 8, Loss: 0.07226849347352982, Accuracy: 97.3316650390625
Epoch 9, Loss: 0.06290334463119507, Accuracy: 97.6883316040039
Epoch 10, Loss: 0.06105658411979675, Accuracy: 97.72666931152344

प्रतिकृतियों में ट्रैकिंग प्रशिक्षण हानि

हम tf.metrics.Mean का उपयोग करने की अनुशंसा नहीं करते हैं, जो tf.metrics.Mean में प्रशिक्षण हानि को ट्रैक करने के लिए किया जाता है, क्योंकि हानि स्केलिंग गणना की जाती है।

उदाहरण के लिए, यदि आप निम्नलिखित विशेषताओं के साथ एक प्रशिक्षण कार्य चलाते हैं:

  • दो प्रतिकृतियां
  • प्रत्येक प्रतिकृति पर दो नमूने संसाधित किए जाते हैं
  • परिणामी हानि मान: [२, ३] और [४, ५] प्रत्येक प्रतिकृति पर
  • वैश्विक बैच आकार = 4

हानि स्केलिंग के साथ, आप हानि मान जोड़कर, और फिर वैश्विक बैच आकार से विभाजित करके प्रत्येक प्रतिकृति पर हानि के प्रति-नमूना मान की गणना करते हैं। इस मामले में: (2 + 3) / 4 = 1.25 और (4 + 5) / 4 = 2.25

यदि आप दो tf.metrics.Mean में हानि को ट्रैक करने के लिए tf.metrics.Mean का उपयोग करते हैं, तो परिणाम भिन्न होता है। इस उदाहरण में, आप total 3.50 और 2 की count के साथ समाप्त होते हैं, जिसके result() मीट्रिक पर result() को कॉल करने पर total / count = 1.75 होती है। tf.keras.Metrics साथ गणना की गई हानि को एक अतिरिक्त कारक द्वारा बढ़ाया जाता है जो सिंक में प्रतिकृतियों की संख्या के बराबर होता है।

गाइड और उदाहरण

कस्टम प्रशिक्षण लूप के साथ वितरण रणनीति का उपयोग करने के लिए यहां कुछ उदाहरण दिए गए हैं:

  1. वितरित प्रशिक्षण गाइड
  2. DenseNet उदाहरण का उपयोग MirroredStrategy
  3. बर्ट उदाहरण का उपयोग प्रशिक्षित MirroredStrategy और TPUStrategy । यह उदाहरण विशेष रूप से यह समझने में मददगार है कि कैसे चेकपॉइंट से लोड किया जाए और वितरित प्रशिक्षण आदि के दौरान समय-समय पर चौकियों को कैसे बनाया जाए।
  4. एनसीएफ उदाहरण का उपयोग प्रशिक्षित MirroredStrategy कि का उपयोग कर सक्रिय किया जा सकता keras_use_ctl झंडा।
  5. NMT उदाहरण का उपयोग प्रशिक्षित MirroredStrategy

वितरण रणनीति गाइड में सूचीबद्ध अधिक उदाहरण।

अगला कदम