তারিখটা মনে রেখো! গুগল I / O মে 18-20 মে এখনই রেজিস্টার করুন
This page was translated by the Cloud Translation API.
Switch to English

প্যারামিটার সার্ভার প্রশিক্ষণ

টেনসরফ্লো.আর.জে দেখুন গুগল কোলাবে চালান গিটহাবের উত্স দেখুন নোটবুক ডাউনলোড করুন

ওভারভিউ

প্যারামিটার সার্ভার প্রশিক্ষণ একাধিক মেশিনে মডেল প্রশিক্ষণের জন্য স্কেল করার জন্য একটি সাধারণ তথ্য সমান্তরাল পদ্ধতি। একটি প্যারামিটার সার্ভার প্রশিক্ষণ ক্লাস্টারে কর্মী এবং পরামিতি সার্ভার থাকে। ভেরিয়েবলগুলি প্যারামিটার সার্ভারগুলিতে তৈরি করা হয় এবং তারা প্রতিটি ধাপে শ্রমিকদের দ্বারা পড়া এবং আপডেট করা হয়। ডিফল্টরূপে, কর্মীরা একে অপরের সাথে সিঙ্ক্রোনাইজ না করে এই ভেরিয়েবলগুলি স্বাধীনভাবে পড়ে এবং আপডেট করে। এ কারণেই কখনও কখনও প্যারামিটার সার্ভার-স্টাইলের প্রশিক্ষণকে অ্যাসিনক্রোনাস প্রশিক্ষণ বলা হয়।

tf.distribute.experimental.ParameterServerStrategy , প্যারামিটার সার্ভার প্রশিক্ষণটি tf.distribute.experimental.ParameterServerStrategy ক্লাস দ্বারা চালিত হয়, যা একটি ক্লাস্টারে প্রশিক্ষণের পদক্ষেপগুলি বিতরণ করে যা কয়েক হাজার কর্মী (প্যারামিটার সার্ভারের সাথে) পর্যন্ত স্কেল করে tf.distribute.experimental.ParameterServerStrategy দুটি প্রধান সমর্থিত প্রশিক্ষণ এপিআই রয়েছে: কেরাস প্রশিক্ষণ এপিআই, Model.fit এবং কাস্টম প্রশিক্ষণ লুপ (সিটিএল) নামেও পরিচিত। ব্যবহারকারীরা যখন উচ্চ-স্তরের বিমূর্ততা এবং প্রশিক্ষণের ব্যবস্থাপনাকে পছন্দ করেন তখন Model.fit সুপারিশ করা হয়, যখন ব্যবহারকারীরা তাদের প্রশিক্ষণ লুপের বিশদটি সংজ্ঞায়িত করতে পছন্দ করেন তখন সিটিএল প্রস্তাবিত হয়।

পছন্দের এপিআই নির্বিশেষে, টিএফ 2-তে বিতরণ প্রশিক্ষণে বেশ কয়েকটি "চাকরি" সহ একটি "ক্লাস্টার" জড়িত এবং প্রতিটি কাজের মধ্যে একটি বা একাধিক "কার্য" থাকতে পারে। প্যারামিটার সার্ভার প্রশিক্ষণ ব্যবহার করার সময়, একটি সমন্বয়কারী কাজ (যা কাজের নাম chief ), একাধিক কর্মী চাকুরী (কাজের নাম worker ) এবং একাধিক প্যারামিটার সার্ভার জব (কাজের নাম ps ) রাখার পরামর্শ দেওয়া হয়।

যখন সমন্বয়কারী সংস্থান তৈরি করে, প্রশিক্ষণের কাজ প্রেরণ করে, চেকপয়েন্টগুলি লেখেন এবং টাস্ক ব্যর্থতার সাথে সম্পর্কিত হন, তখন কর্মীরা এবং পরামিতি সার্ভারগুলি tf.distribute.Server যা সমন্বয়কের অনুরোধের জন্য শোনেন।

Model.fit এপিআই সহ প্যারামিটার সার্ভার প্রশিক্ষণ

Model.fit এপিআই সহ প্যারামিটার সার্ভার প্রশিক্ষণের জন্য সমন্বয়কারীকে একটি tf.distribute.experimental.ParameterServerStrategy অবজেক্ট এবং একটি tf.keras.utils.experimental.DatasetCreator ইনপুট হিসাবে ব্যবহার করতে হবে। কোনও কৌশল ছাড়াই বা অন্যান্য কৌশলগুলির সাথে Model.fit ব্যবহারের মতো, ওয়ার্কফ্লোতে মডেল তৈরি এবং সংকলন করা, কলব্যাকগুলি প্রস্তুত করা, তারপরে একটি Model.fit কল অন্তর্ভুক্ত।

কাস্টম প্রশিক্ষণ লুপ (সিটিএল) এপিআই সহ প্যারামিটার সার্ভার প্রশিক্ষণ

tf.distribute.experimental.coordinator.ClusterCoordinator সহ, tf.distribute.experimental.coordinator.ClusterCoordinator বর্গ হ'ল সমন্বয়কারীর জন্য ব্যবহৃত মূল উপাদান। ClusterCoordinator শ্রেণীর tf.distribute.Strategy অবজেক্টের সাথে একত্রে কাজ করা দরকার। এই tf.distribute.Strategy অবজেক্টটি ক্লাস্টারের তথ্য সরবরাহ করার জন্য প্রয়োজনীয় এবং প্রশিক্ষণ ধাপটি সংজ্ঞায়িত করতে ব্যবহৃত হয় যেমন আমরা MirroredStrategy সাথে কাস্টম প্রশিক্ষণে দেখেছি। ClusterCoordinator অবজেক্ট তারপরে এই প্রশিক্ষণ পদক্ষেপগুলি কার্যকর করে প্রত্যন্ত কর্মীদের কাছে প্রেরণ করে। প্যারামিটার সার্ভার প্রশিক্ষণের জন্য, ClusterCoordinator একটি tf.distribute.experimental.ParameterServerStrategy সাথে কাজ করা tf.distribute.experimental.ParameterServerStrategy

ClusterCoordinator অবজেক্ট দ্বারা সরবরাহ করা সর্বাধিক গুরুত্বপূর্ণ এপিআই হ'ল scheduleschedule এপিআই একটি tf.function এবং অবিলম্বে একটি ভবিষ্যতের মত RemoteValue ফিরে। RemoteValue দূরবর্তী কর্মীদের কাছে পটভূমির থ্রেডগুলিতে প্রেরণ করা হবে এবং তাদের RemoteValue পূরণ করা হবে। যেহেতু schedule জন্য কর্মী নিয়োগের প্রয়োজন হয় না, তাই পাস করা tf.function কোনও উপলব্ধ কর্মীর উপর কার্যকর করা যেতে পারে। যদি এটিতে মৃত্যুদণ্ড কার্যকর করা হয় এমন কর্মীটি এর সমাপ্তির পূর্বে অনুপলব্ধ হয়ে যায়, তবে অন্য উপলব্ধ কর্মীর উপর ফাংশনটি আবার চেষ্টা করা হবে। এই বাস্তবতা এবং ফাংশন সম্পাদন পারমাণবিক নয় এই কারণে, একটি ফাংশন একাধিকবার কার্যকর করা হতে পারে।

রিমোট ফাংশন প্রেরণের পাশাপাশি, ClusterCoordinator যখন সমস্ত শ্রমিকের ব্যর্থতা থেকে ফিরে আসে তখন সমস্ত কর্মীদের উপর ডেটাসেট তৈরি করতে এবং এই ডেটাসেটগুলি পুনর্নির্মাণে সহায়তা করে।

টিউটোরিয়াল সেটআপ

টিউটোরিয়ালটি সিটিএল বা Model.fit পাথগুলিতে শাখা করবে এবং আপনি আপনার প্রয়োজন অনুসারে একটি চয়ন করতে পারেন। "এক্স ট্রেনিং উইথ এক্স" ব্যতীত অন্যান্য বিভাগ দুটি পাথের জন্যই প্রযোজ্য।

pip install -q portpicker
pip install -q tf-nightly
import multiprocessing
import os
import random
import portpicker
import tensorflow as tf
import tensorflow.keras as keras
import tensorflow.keras.layers.experimental.preprocessing as kpl

ক্লাস্টার সেটআপ

উপরে উল্লিখিত হিসাবে, একটি প্যারামিটার সার্ভার প্রশিক্ষণ ক্লাস্টারের একটি সমন্বয়কারী টাস্ক প্রয়োজন যা আপনার প্রশিক্ষণ প্রোগ্রামটি চালায়, এক বা একাধিক কর্মী এবং পরামিতি সার্ভার tf.distribute.Server সার্ভারগুলি চালনা করে, যেমন tf.distribute.Server , এবং সম্ভবত একটি অতিরিক্ত মূল্যায়ন টাস্ক যা সাইড-কার চালায় tf.distribute.Server মূল্যায়ন (নীচে পার্শ্ব-গাড়ী মূল্যায়ন বিভাগ দেখুন)। এগুলি সেট আপ করার প্রয়োজনীয়তাগুলি হ'ল:

  • সমন্বয়কারী টাস্কের মূল্যায়নকারী ব্যতীত অন্য সমস্ত টেনসরফ্লো সার্ভারের ঠিকানা এবং পোর্টগুলি জানতে হবে।
  • কর্মীরা এবং প্যারামিটার সার্ভারগুলি তাদের কোন পোর্টটি শুনতে হবে তা জানতে হবে। সরলতার স্বার্থে, আমরা যখন এই কাজগুলিতে টেনসরফ্লো সার্ভার তৈরি করি তখন আমরা সাধারণত সম্পূর্ণ ক্লাস্টার তথ্যটি পাস করি।
  • মূল্যায়নকারী কার্য প্রশিক্ষণ ক্লাস্টারের সেটআপ জানতে হবে না। যদি এটি হয় তবে প্রশিক্ষণ ক্লাস্টারের সাথে সংযোগ স্থাপনের চেষ্টা করা উচিত নয়।
  • শ্রমিক এবং প্যারামিটার সার্ভারগুলির যথাক্রমে "কর্মী" এবং "পিএস" হিসাবে টাস্ক ধরণের হওয়া উচিত। সমন্বয়কারীকে উত্তরাধিকারগত কারণে টাস্ক টাইপ হিসাবে "চিফ" ব্যবহার করা উচিত।

এই টিউটোরিয়ালে, আমরা একটি প্রসেস ক্লাস্টার তৈরি করব যাতে পুরো পরামিতি সার্ভার প্রশিক্ষণ কোলাবে চালানো যায়। পরবর্তী বিভাগে কীভাবে আসল ক্লাস্টার সেটআপ করা যায় তা আমরা উপস্থাপন করব।

প্রক্রিয়া ক্লাস্টার

এই টিউটোরিয়ালে, আমরা আগে থেকে টেনসরফ্লো সার্ভারগুলির একটি গুচ্ছ শুরু করব এবং তাদের সাথে পরে সংযুক্ত করব। নোট করুন যে এটি কেবলমাত্র এই টিউটোরিয়ালটির প্রদর্শনের উদ্দেশ্যে, এবং প্রকৃত প্রশিক্ষণে সার্ভারগুলি কর্মী এবং পিএস মেশিনে শুরু করা হবে।

def create_in_process_cluster(num_workers, num_ps):
  """Creates and starts local servers and returns the cluster_resolver."""
  worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
  ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]

  cluster_dict = {}
  cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
  if num_ps > 0:
    cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]

  cluster_spec = tf.train.ClusterSpec(cluster_dict)

  # Workers need some inter_ops threads to work properly.
  worker_config = tf.compat.v1.ConfigProto()
  if multiprocessing.cpu_count() < num_workers + 1:
    worker_config.inter_op_parallelism_threads = num_workers + 1

  for i in range(num_workers):
    tf.distribute.Server(
        cluster_spec, job_name="worker", task_index=i, config=worker_config,
        protocol="grpc")

  for i in range(num_ps):
    tf.distribute.Server(
        cluster_spec, job_name="ps", task_index=i, protocol="grpc")

  cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
      cluster_spec, rpc_layer="grpc")
  return cluster_resolver

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)

প্রসেস ক্লাস্টার সেটআপটি প্রায়শই আমাদের ইউনিট পরীক্ষায় ব্যবহৃত হয়। এখানে একটি উদাহরণ

একটি ParameterServerStrategy সার্ভারস্ট্রেটিজি ইনস্ট্যান্ট করুন

আমরা প্রশিক্ষণ কোডে ডুব দেওয়ার আগে আসুন একটি ParameterServerStrategy সার্ভারস্ট্রেজি অবজেক্টটি ইনস্ট্যান্ট করি। মনে রাখবেন যে আপনি কাস্টম প্রশিক্ষণ লুপ বা Model.fit নিয়ে এগিয়ে Model.fit তা নির্বিশেষে Model.fit variable_partitioner যুক্তি পরবর্তী বিভাগে ব্যাখ্যা করা হবে।

variable_partitioner = (
    tf.distribute.experimental.partitioners.FixedShardsPartitioner(
        num_shards=NUM_PS))

strategy = tf.distribute.experimental.ParameterServerStrategy(
    cluster_resolver,
    variable_partitioner=variable_partitioner)
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'ps': ['localhost:18923', 'localhost:24530'], 'worker': ['localhost:19174', 'localhost:22565', 'localhost:23430']})
INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'ps': ['localhost:18923', 'localhost:24530'], 'worker': ['localhost:19174', 'localhost:22565', 'localhost:23430']})
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:chief/replica:0/task:0/device:CPU:0'], variable_device = '/job:chief/replica:0/task:0/device:CPU:0'
INFO:tensorflow:Number of GPUs on workers: 0

প্রশিক্ষণের জন্য জিপিইউগুলি ব্যবহার করতে প্রতিটি কর্মীর জন্য দৃশ্যমান জিপিইউ বরাদ্দ করুন। ParameterServerStrategy সার্ভারস্ট্রেজি প্রতিটি কর্মীর উপর উপলব্ধ সমস্ত জিপিইউ ব্যবহার করবে, এই বিধিনিষেধের সাথে যে সমস্ত শ্রমিকের একই সংখ্যক জিপিইউ উপলব্ধ রয়েছে should

পরিবর্তনশীল শার্পিং

ভেরিয়েবল শারডিং বলতে ভেরিয়েবলকে একাধিক ছোট ভেরিয়েবলগুলিতে বিভক্ত করতে বোঝায়। আমরা এই ছোট ভেরিয়েবলগুলি শারড গুলি বলি। এই শার্পগুলি অ্যাক্সেস করার সময় ভেরিয়েবল শারডিং নেটওয়ার্ক লোড বিতরণে কার্যকর হতে পারে। একাধিক প্যারামিটার সার্ভারগুলিতে একটি সাধারণ ভেরিয়েবলের গণনা এবং স্টোরেজ বিতরণেও এটি দরকারী।

পরিবর্তনশীল sharding সক্ষম করতে, আপনাকে একটি প্রেরণ করতে পারেন variable_partitioner যখন একটি নির্মাণের ParameterServerStrategy অবজেক্ট। variable_partitioner পার্টিশনকারীটি প্রতিবার একটি ভেরিয়েবল তৈরি করা হবে এবং ভেরিয়েবলের প্রতিটি মাত্রা বরাবর শারডের সংখ্যা প্রত্যাশা করা হবে। কিছু আউট অফ-বাক্স variable_partitioner tf.distribute.experimental.partitioners.FixedShardsPartitioner সরবরাহ করা হয় যেমন tf.distribute.experimental.partitioners.FixedShardsPartitioner

যখন কোনও variable_partitioner পাস করা হয় এবং আপনি যদি strategy.scope() আওতায় সরাসরি একটি ভেরিয়েবল তৈরি করেন, এটি একটি variables সম্পত্তি সহ একটি ধারক প্রকারে পরিণত হবে যা শার্দের তালিকার অ্যাক্সেস সরবরাহ করে। বেশিরভাগ ক্ষেত্রেই, এই ধারকটি সমস্ত শার্দের সাথে যুক্ত হয়ে স্বয়ংক্রিয়ভাবে একটি টেনসরে রূপান্তরিত হবে। ফলস্বরূপ, এটি একটি সাধারণ পরিবর্তনশীল হিসাবে ব্যবহার করা যেতে পারে। অন্যদিকে, কিছু টেনসরফ্লো পদ্ধতি যেমন tf.nn.embedding_lookup এই ধারক প্রকারের জন্য কার্যকর বাস্তবায়ন সরবরাহ করে এবং এই পদ্ধতিগুলিতে স্বয়ংক্রিয়ভাবে tf.nn.embedding_lookup এড়ানো হবে।

আরও বিশদের জন্য অনুগ্রহ করে ParameterServerStrategy সার্ভারস্ট্রেজি এর এপিআই ডাস্ট্রিং দেখুন।

Model.fit সাথে প্রশিক্ষণ

কেরাস Model.fit মাধ্যমে ব্যবহারযোগ্য একটি সহজ প্রশিক্ষণ এপিআই সরবরাহ করে যা হুডের অধীনে প্রশিক্ষণের লুপটি পরিচালনা করে, ওভারডেজেবল train_step এবং train_step যা টেকনরবোর্ডের জন্য চেকপয়েন্ট সংরক্ষণ বা সংক্ষিপ্ত সংরক্ষণের মতো কার্যকারিতা সরবরাহ করে। Model.fit সাহায্যে কৌশল অবজেক্টের একটি সাধারণ অদলবদল সহ অন্যান্য কৌশলগুলির জন্য একই প্রশিক্ষণ কোডটি ব্যবহার করা যেতে পারে।

তথ্য অন্তর্ভুক্তী

পরামিতি সার্ভার প্রশিক্ষণের সাথে Model.fit জন্য প্রয়োজন হয় যে কলযোগ্য ইনপুট ডেটা সরবরাহ করা উচিত যা tf.distribute.InputContext টাইপের একক যুক্তি নেয় এবং একটিtf.data.Dataset । তারপরে, একটি tf.keras.utils.experimental.DatasetCreator অবজেক্ট তৈরি করুন যা এই জাতীয় callable এবং একটি tf.distribute.InputOptions input_options . input_options যুক্তির মাধ্যমে input_options অবজেক্ট object মনে রাখবেন যে প্যারামিটার সার্ভার প্রশিক্ষণের সাথে ডেটাটি steps_per_epoch এবং পুনরাবৃত্তি করার পরামর্শ দেওয়া হয় এবং fit কলের সাথে steps_per_epoch নির্দিষ্ট করুন যাতে গ্রন্থাগারটি যুগের সীমানা জানে।

ইনপুট InputContext যুক্তি সম্পর্কে আরও তথ্যের জন্য দয়া করে বিতরণ করা ইনপুট গাইড দেখুন InputContext

def dataset_fn(input_context):
  global_batch_size = 64
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)
  x = tf.random.uniform((10, 10))
  y = tf.random.uniform((10,))
  dataset = tf.data.Dataset.from_tensor_slices((x, y)).shuffle(10).repeat()
  dataset = dataset.shard(
      input_context.num_input_pipelines, input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2)
  return dataset

dc = tf.keras.utils.experimental.DatasetCreator(dataset_fn)

dataset_fn এর dataset_fn প্রতিটি কর্মী মেশিনে ইনপুট ডিভাইসে, যা সাধারণত সিপিইউ হয়, তাতে আহ্বান জানানো হবে।

মডেল নির্মাণ এবং সংকলন

এখন, আপনি একটি tf.keras.Model API গুলি (একটি তুচ্ছ tf.keras.models.Sequential মডেল এখানে বিক্ষোভ হিসাবে ব্যবহার করা হচ্ছে) সঙ্গে একটি tf.keras.Model তৈরি করতে হবে, তারপরে অপ্টিমাইজারের মতো উপাদানগুলি Model.compile করার জন্য একটি Model.compile কল অনুসরণ করবে, মেট্রিক্স, বা পরামিতি যেমন steps_per_execution :

with strategy.scope():
  model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)])

model.compile(tf.keras.optimizers.SGD(), loss='mse', steps_per_execution=10)

কলব্যাকস এবং প্রশিক্ষণ

আপনি প্রকৃত প্রশিক্ষণের জন্য model.fit কল করার আগে আসুন সাধারণ কাজের জন্য প্রয়োজনীয় কলব্যাকগুলি প্রস্তুত করুন যেমন:

  • ModelCheckpoint - মডেল ওজন সংরক্ষণ করতে।

  • BackupAndRestore - প্রশিক্ষণের অগ্রগতিটি স্বয়ংক্রিয়ভাবে ব্যাক আপ হয়েছে কিনা তা নিশ্চিত করতে এবং যদি ক্লাস্টারটি অপ্রাপ্যতা অনুভব করে (যেমন গর্ভপাত বা প্রিম্পশন) অনুভব করে বা পুনরুদ্ধার করে,

  • TensorBoard - অগ্রগতি প্রতিবেদনগুলি সংক্ষিপ্ত ফাইলগুলিতে সংরক্ষণ করতে যা টেনসরবার্ড সরঞ্জামটিতে দৃশ্যমান হয়।

নোট করুন যে কর্মক্ষমতা বিবেচনার কারণে, ParameterServerStrategy সার্ভারস্ট্রেটজি ব্যবহার করার সময় কাস্টম কলব্যাকগুলিতে ব্যাচ স্তরের কলব্যাকগুলি ওভাররাইড করা যাবে না। আপনার কাস্টম কলব্যাকগুলি steps_per_epoch লেভেল কলগুলি করতে সংশোধন করুন এবং steps_per_epoch উপযুক্ত মানের সাথে সামঞ্জস্য করুন। steps_per_epoch , ParameterServerStrategy Model.fit ব্যবহার করার সময় Model.fit জন্য Model.fit একটি প্রয়োজনীয় যুক্তি।

working_dir = '/tmp/my_working_dir'
log_dir = os.path.join(working_dir, 'log')
ckpt_filepath = os.path.join(working_dir, 'ckpt')
backup_dir = os.path.join(working_dir, 'backup')
callbacks = [
    tf.keras.callbacks.TensorBoard(log_dir=log_dir),
    tf.keras.callbacks.ModelCheckpoint(filepath=ckpt_filepath),
    tf.keras.callbacks.experimental.BackupAndRestore(backup_dir=backup_dir),
]
model.fit(dc, epochs=5, steps_per_epoch=20, callbacks=callbacks)
Epoch 1/5
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
20/20 - 3s - loss: 0.4391
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
Epoch 2/5
20/20 - 0s - loss: 0.4103
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
Epoch 3/5
20/20 - 0s - loss: 0.3056
WARNING:tensorflow:5 out of the last 5 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7fefd40207b8> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
WARNING:tensorflow:6 out of the last 6 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7fee7553b730> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
Epoch 4/5
20/20 - 0s - loss: 0.2992
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
Epoch 5/5
20/20 - 0s - loss: 0.2698
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
<tensorflow.python.keras.callbacks.History at 0x7fefe405c048>

ClusterCoordinator সাথে সরাসরি ব্যবহার ( ClusterCoordinator )

এমনকি আপনি Model.fit প্রশিক্ষণের পথটি বেছে Model.fit , আপনি শ্রমিকদের উপর মৃত্যুদন্ড কার্যকর করতে চান এমন অন্যান্য ক্রিয়াকলাপ নির্ধারণের জন্য ClusterCoordinator অবজেক্টটি বৈকল্পিকভাবে ইনস্ট্যান্ট করতে পারেন। আরও বিশদ এবং উদাহরণের জন্য কাস্টম প্রশিক্ষণ লুপ বিভাগ সহ প্রশিক্ষণ দেখুন।

কাস্টম প্রশিক্ষণ লুপ সঙ্গে প্রশিক্ষণ

tf.distribute.Strategy সহ কাস্টম প্রশিক্ষণ লুপ প্রশিক্ষণ লুপগুলি সংজ্ঞায়িত করতে দুর্দান্ত নমনীয়তা সরবরাহ করে। ParameterServerStrategy ClusterCoordinator উপরোক্ত সংজ্ঞায়িত করে, আপনি প্রত্যন্ত শ্রমিকদের প্রশিক্ষণের পদক্ষেপগুলি কার্যকর করার জন্য একটি ClusterCoordinator ব্যবহার করবেন।

তারপরে, আপনি একটি মডেল তৈরি করবেন, একটি ডেটাसेट এবং একটি পদক্ষেপ ফাংশন সংজ্ঞায়িত করবেন যেমন আমরা অন্যান্য tf.distribute.Strategy গুলি এর সাথে প্রশিক্ষণ লুপে দেখেছি। আপনি এই টিউটোরিয়ালে আরও বিশদ খুঁজে পেতে পারেন।

দক্ষ ডেটাসেট প্রিফেচিং নিশ্চিত করতে, নীচের প্রত্যন্ত কর্মীদের বিভাগে প্রেরণ প্রশিক্ষণ পদক্ষেপে উল্লিখিত প্রস্তাবিত বিতরণ করা ডেটাসেট তৈরির API গুলি ব্যবহার করুন। এছাড়াও, কর্মীদের উপর বরাদ্দকৃত জিপিইউগুলির পুরো সুবিধা নেওয়ার জন্য কর্মী_আফএন'র অভ্যন্তরে strategy.run কল করতে ভুলবেন না। অন্যান্য পদক্ষেপগুলি জিপিইউ সহ বা ছাড়াই প্রশিক্ষণের জন্য একই।

আসুন নিম্নলিখিত পদক্ষেপগুলিতে এই উপাদানগুলি তৈরি করুন:

ডেটা সেটআপ করুন

প্রথমে এমন একটি ফাংশন লিখুন যা একটি ডেটাসেট তৈরি করে যার মধ্যে কেরাস প্রিপ্রোসেসিং স্তরগুলি প্রয়োগ করে প্রিপ্রোসেসিং যুক্তি অন্তর্ভুক্ত। আমরা এই স্তরগুলি dataset_fn বাইরে তৈরি করব তবে dataset_fn অভ্যন্তরে রূপান্তরটি প্রয়োগ dataset_fn যেহেতু আপনি dataset_fn একটি tf.function যা এর ভিতরে ভেরিয়েবলগুলি তৈরি করতে দেয় না।

feature_vocab = [
    "avenger", "ironman", "batman", "hulk", "spiderman", "kingkong",
    "wonder_woman"
]
label_vocab = ["yes", "no"]

with strategy.scope():
  feature_lookup_layer = kpl.StringLookup(vocabulary=feature_vocab)

  label_lookup_layer = kpl.StringLookup(vocabulary=label_vocab,
                                        num_oov_indices=0,
                                        mask_token=None)

  raw_feature_input = keras.layers.Input(
      shape=(3,), dtype=tf.string, name="feature")
  feature_id_input = feature_lookup_layer(raw_feature_input)
  feature_preprocess_stage = keras.Model(
      {"features": raw_feature_input}, feature_id_input)

  raw_label_input = keras.layers.Input(
      shape=(1,), dtype=tf.string, name="label")
  label_id_input = label_lookup_layer(raw_label_input)
  label_preprocess_stage = keras.Model({"label": raw_label_input}, label_id_input)

একটি ডেটাসেটে খেলনার উদাহরণ তৈরি করুন:

def feature_and_label_gen(num_examples=200):
  examples = {"features": [], "label": []}
  for _ in range(num_examples):
    features = random.sample(feature_vocab, 3)
    label = ["yes"] if "avenger" in features else ["no"]
    examples["features"].append(features)
    examples["label"].append(label)
  return examples

examples = feature_and_label_gen()

তারপরে আমরা একটি ডেটাসেট_ফএনতে আবৃত প্রশিক্ষণ ডেটাসেট তৈরি করি:

def dataset_fn(_):
  raw_dataset = tf.data.Dataset.from_tensor_slices(examples)

  train_dataset = raw_dataset.map(
      lambda x: (
          {"features": feature_preprocess_stage(x["features"])},
          label_preprocess_stage(x["label"])
      )).shuffle(200).batch(32).repeat()
  return train_dataset

মডেল তৈরি করুন

দ্বিতীয়ত, আমরা মডেল এবং অন্যান্য অবজেক্ট তৈরি করি। অধীনস্থ সকল ভেরিয়েবল তৈরি করতে নিশ্চিত করুন strategy.scope

# These variables created under the `strategy.scope` will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
  # Create the model. The input needs to be compatible with KPLs.
  model_input = keras.layers.Input(
      shape=(3,), dtype=tf.int64, name="model_input")

  emb_layer = keras.layers.Embedding(
      input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=20)
  emb_output = tf.reduce_mean(emb_layer(model_input), axis=1)
  dense_output = keras.layers.Dense(units=1, activation="sigmoid")(emb_output)
  model = keras.Model({"features": model_input}, dense_output)

  optimizer = keras.optimizers.RMSprop(learning_rate=0.1)
  accuracy = keras.metrics.Accuracy()

আসুন নিশ্চিত করুন যে FixedShardsPartitioner ব্যবহার সমস্ত ভেরিয়েবলকে দুটি FixedShardsPartitioner বিভক্ত করে এবং প্রতিটি FixedShardsPartitioner বিভিন্ন প্যারামিটার সার্ভারগুলিতে বরাদ্দ করা হয়েছিল:

assert len(emb_layer.weights) == 2
assert emb_layer.weights[0].shape == (5, 20)
assert emb_layer.weights[1].shape == (4, 20)
assert emb_layer.weights[0].device == "/job:ps/replica:0/task:0/device:CPU:0"
assert emb_layer.weights[1].device == "/job:ps/replica:0/task:1/device:CPU:0"

প্রশিক্ষণের পদক্ষেপটি সংজ্ঞায়িত করুন

তৃতীয়, একটি tf.function মোড়ানো প্রশিক্ষণ পদক্ষেপ তৈরি করুন:

@tf.function
def step_fn(iterator):

  def replica_fn(batch_data, labels):
    with tf.GradientTape() as tape:
      pred = model(batch_data, training=True)
      per_example_loss = keras.losses.BinaryCrossentropy(
              reduction=tf.keras.losses.Reduction.NONE)(labels, pred)
      loss = tf.nn.compute_average_loss(per_example_loss)
      gradients = tape.gradient(loss, model.trainable_variables)

    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    accuracy.update_state(labels, actual_pred)
    return loss

  batch_data, labels = next(iterator)
  losses = strategy.run(replica_fn, args=(batch_data, labels))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)

উপরে পদক্ষেপ ফাংশন ইন, কলিং strategy.run এবং strategy.reduce মধ্যে step_fn কর্মী প্রতি একাধিক GPU সমর্থন করতে পারে। শ্রমিকদের জিপিইউ বরাদ্দ করে থাকেন, strategy.run একাধিক প্রতিলিপি উপর ডেটাসেট বিতরণ করবে।

প্রত্যন্ত শ্রমিকদের প্রশিক্ষণের পদক্ষেপগুলি প্রেরণ করুন

সমস্ত গণনাগুলি ParameterServerStrategy ClusterCoordinator দ্বারা সংজ্ঞায়িত করার পরে, আমরা সংস্থান তৈরি করতে এবং প্রত্যন্ত কর্মীদের প্রশিক্ষণের পদক্ষেপগুলি বিতরণ করতে ClusterCoordinator শ্রেণিটি ব্যবহার করব।

প্রথমে একটি ClusterCoordinator অবজেক্ট তৈরি করুন এবং কৌশল অবজেক্টে পাস করুন:

coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)

তারপরে আমরা প্রতি-কর্মী ডেটাসেট এবং একটি পুনরুক্তি তৈরি করব। ইন per_worker_dataset_fn নীচে মোড়কে dataset_fn মধ্যে strategy.distribute_datasets_from_function জিপিইউ দক্ষ পূর্বআনয়ন অঙ্গীভূতভাবে করার অনুমতি বাঞ্ছনীয়।

@tf.function
def per_worker_dataset_fn():
  return strategy.distribute_datasets_from_function(dataset_fn)

per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).

চূড়ান্ত পদক্ষেপটি schedule ব্যবহার করে প্রত্যন্ত কর্মীদের কাছে গণনা বিতরণ করা। schedule পদ্ধতিটি একটি tf.function এবং অবিলম্বে একটি ভবিষ্যতের মত RemoteValue ফিরে। RemoteValue দূরবর্তী কর্মীদের কাছে পটভূমির থ্রেডগুলিতে প্রেরণ করা হবে এবং RemoteValue পূর্ণ হবে। join পদ্ধতিটি সমস্ত নির্ধারিত ফাংশনগুলি উত্তেজিত না হওয়া পর্যন্ত অপেক্ষা করতে ব্যবহৃত হতে পারে।

num_epoches = 4
steps_per_epoch = 5
for i in range(num_epoches):
  accuracy.reset_states()
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  # Wait at epoch boundaries.
  coordinator.join()
  print ("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
Finished epoch 0, accuracy is 0.818750.
Finished epoch 1, accuracy is 1.000000.
Finished epoch 2, accuracy is 1.000000.
Finished epoch 3, accuracy is 1.000000.

এখানে আপনি কীভাবে একটি RemoteValue ফলাফল আনতে পারেন:

loss = coordinator.schedule(step_fn, args=(per_worker_iterator,))
print ("Final loss is %f" % loss.fetch())
Final loss is 0.004459

বিকল্পভাবে, আপনি সমস্ত পদক্ষেপগুলি চালু করতে এবং সমাপ্তির জন্য অপেক্ষা করতে করতে কিছু করতে পারেন:

for _ in range(total_steps):
  coordinator.schedule(step_fn, args=(per_worker_iterator,))
while not coordinator.done():
  time.sleep(10)
  # Do something like logging metrics or writing checkpoints.

এই নির্দিষ্ট উদাহরণের জন্য সম্পূর্ণ প্রশিক্ষণ এবং কর্মপ্রবাহের জন্য, দয়া করে এই পরীক্ষাটি দেখুন

ডেটাसेट তৈরি সম্পর্কে আরও

উপরের কোডে create_per_worker_dataset API ব্যবহার করে তৈরি করা হয়েছে। এটি প্রতি শ্রমিকের জন্য একটি ডেটাসেট তৈরি করে এবং একটি ধারক বস্তু দেয়। প্রতি কর্মী পুনরুক্তি তৈরির জন্য আপনি এটিতে iter পদ্ধতিটি কল করতে পারেন। প্রতি কর্মী পুনরাবৃত্তকারীটিতে প্রতিটি শ্রমিকের জন্য একটি পুনরাবৃত্ত থাকে এবং একটি নির্দিষ্ট কর্মীর উপর ফাংশন কার্যকর হওয়ার আগে schedule পদ্ধতিতে পাস করা ফাংশনের ইনপুট আর্গুমেন্টে কোনও শ্রমিকের সম্পর্কিত স্লাইস প্রতিস্থাপন করা হবে।

বর্তমানে schedule পদ্ধতিটি শ্রমিকরা সমতুল্য এবং এইভাবে ধরে নিয়েছে যে বিভিন্ন শ্রমিকের ডেটাসেটগুলি একই রকম হয় যদি না তারা যদি কোনও ডেটাসেট.শ্যাফেল অপারেশন থাকে তবে ভিন্নভাবে পরিবর্তিত হতে পারে। এ কারণে আমরা ডেটাসেটগুলিকে অনির্দিষ্টকালের জন্য পুনরাবৃত্তি করার পরামর্শ দিই এবং ডেটাসেট থেকে OutOfRangeError উপর নির্ভর না করে সীমাবদ্ধ পদক্ষেপের সময়সূচি নির্ধারণ করি।

আরেকটি গুরুত্বপূর্ণ দ্রষ্টব্য হ'ল tf.data ডেটাসেটগুলি কার্য সীমানা জুড়ে অন্তর্ভুক্ত সিরিয়ালাইজেশন এবং tf.data সমর্থন করে না। সুতরাং create_per_worker_dataset এ ফাংশনটি পাস করে পুরো ডেটাসেট তৈরি করা গুরুত্বপূর্ণ।

মূল্যায়ন

বিতরণ প্রশিক্ষণে মূল্যায়ন লুপটি সংজ্ঞায়িত ও পরিচালনা করার একাধিক উপায় রয়েছে। নীচে বর্ণিত প্রত্যেকটির নিজস্ব মতামত রয়েছে। আপনার পছন্দ না থাকলে ইনলাইন মূল্যায়ন পদ্ধতির প্রস্তাব দেওয়া হয়।

ইনলাইন মূল্যায়ন

এই পদ্ধতিতে সমন্বয়কারী প্রশিক্ষণ এবং মূল্যায়নের মধ্যে পরিবর্তিত হয় এবং আমরা একে ইনলাইন মূল্যায়ন বলে থাকি। ইনলাইন মূল্যায়নের বিভিন্ন সুবিধা রয়েছে। উদাহরণস্বরূপ, এটি বৃহত্তর মূল্যায়ন মডেল এবং মূল্যায়ন ডেটাসেটগুলিকে সমর্থন করতে পারে যা কোনও একক টাস্ক ধরে রাখতে পারে না। অন্য উদাহরণের জন্য, মূল্যায়ন ফলাফলগুলি পরবর্তী যুগের প্রশিক্ষণের সিদ্ধান্ত নিতে ব্যবহার করা যেতে পারে।

ইনলাইন মূল্যায়ন কার্যকর করার দুটি উপায় রয়েছে:

  • প্রত্যক্ষ মূল্যায়ন - ছোট মডেল এবং মূল্যায়ন ডেটাসেটের জন্য সমন্বয়কারী সমন্বয়কের উপর মূল্যায়ন ডেটাসেটের সাহায্যে সরাসরি বিতরণকারী মডেলটিতে মূল্যায়ন পরিচালনা করতে পারেন:
eval_dataset = tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).batch(8)

eval_accuracy = keras.metrics.Accuracy()
for batch_data, labels in eval_dataset:
  pred = model(batch_data, training=False)
  actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
  eval_accuracy.update_state(labels, actual_pred)

print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000
  • বিতরণ মূল্যায়ন - বৃহত্তর মডেল বা ডেটাসেটের জন্য যা সরাসরি সমন্বয়কের উপর চালানো অপরিহার্য, সমন্বয়কারী টাস্ক সময় schedule / join পদ্ধতিগুলির মাধ্যমে কর্মীদের মূল্যায়ন কার্য বিতরণ করতে পারে:
with strategy.scope():
  # Define the eval metric on parameter servers.
  eval_accuracy = keras.metrics.Accuracy()

@tf.function
def eval_step(iterator):
  def replica_fn(batch_data, labels):
    pred = model(batch_data, training=False)
    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    eval_accuracy.update_state(labels, actual_pred)
  batch_data, labels = next(iterator)
  strategy.run(replica_fn, args=(batch_data, labels))

def eval_dataset_fn():
  return tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).shuffle(16).repeat().batch(8)

per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn)
per_worker_eval_iterator = iter(per_worker_eval_dataset)

eval_steps_per_epoch = 2
for _ in range(eval_steps_per_epoch):
  coordinator.schedule(eval_step, args=(per_worker_eval_iterator,))
coordinator.join()
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000

পার্শ্ব-গাড়ী মূল্যায়ন

অন্য পদ্ধতিটিকে সাইড-কার মূল্যায়ন বলা হয় যা একটি উত্সর্গীকৃত মূল্যায়নকারী টাস্ক তৈরি করা হয় যা বারবার চেকপয়েন্টগুলি পড়ে এবং সর্বশেষ চেকপয়েন্টে মূল্যায়ন চালায়। মূল্যায়নের ফলাফলের উপর ভিত্তি করে যদি আপনার প্রশিক্ষণের লুপটি পরিবর্তন করার প্রয়োজন না হয় তবে এটি আপনার প্রশিক্ষণ প্রোগ্রামটি প্রথম দিকে শেষ করতে দেয়। যাইহোক, মূল্যায়ন ট্রিগার করতে এটির জন্য অতিরিক্ত মূল্যায়নকারী কার্য এবং পর্যায়ক্রমিক চেকপয়েন্টিং প্রয়োজন। নিম্নলিখিত একটি সম্ভাব্য পার্শ্ব-গাড়ী মূল্যায়ন লুপ:

checkpoint_dir = ...
eval_model = ...
eval_data = ...
checkpoint = tf.train.Checkpoint(model=eval_model)

for latest_checkpoint in tf.train.checkpoints_iterator(
    checkpoint_dir):
  try:
    checkpoint.restore(latest_checkpoint).expect_partial()
  except (tf.errors.OpError,) as e:
    # checkpoint may be deleted by training when it is about to read it.
    continue

  # Optionally add callbacks to write summaries.
  eval_model.evaluate(eval_data)

  # Evaluation finishes when it has evaluated the last epoch.
  if latest_checkpoint.endswith('-{}'.format(train_epoches)):
    break

রিয়েল-ওয়ার্ল্ডে ক্লাস্টার্স

সত্যিকারের উত্পাদন পরিবেশে, আপনি বিভিন্ন মেশিনে বিভিন্ন প্রক্রিয়াতে সমস্ত কাজ পরিচালনা করবেন। প্রতিটি টাস্কে ক্লাস্টার তথ্য কনফিগার করার সহজ উপায় হ'ল "টিএফ_সিএনএফআইজি" এনভায়রনমেন্ট ভেরিয়েবল সেট করা এবং "TF_CONFIG" পার্স করার জন্য tf.distribute.cluster_resolver.TFConfigClusterResolver ব্যবহার করা। "TF_CONFIG" পরিবেশের ভেরিয়েবল সম্পর্কে সাধারণ বিবরণের জন্য দয়া করে বিতরণ প্রশিক্ষণ গাইডটি দেখুন

আপনি যদি কুবেরনেটস বা অন্যান্য কনফিগারেশন টেম্পলেট ব্যবহার করে আপনার প্রশিক্ষণের কাজগুলি শুরু করেন তবে খুব সম্ভবত এই টেমপ্লেটগুলি আপনার জন্য "TF_CONFIG" সেট করেছে have

"TF_CONFIG" পরিবেশ পরিবর্তনশীল সেট করুন

ধরুন আপনার কাছে 3 জন কর্মী এবং 2 টি প্যারামিটার সার্ভার রয়েছে, কর্মী 1 এর "TF_CONFIG" হতে পারে:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"],
        "chief": ["host6:port"]
    },
    "task": {"type": "worker", "index": 1}
})

মূল্যায়নকারীর "TF_CONFIG" হতে পারে:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "evaluator": ["host7:port"]
    },
    "task": {"type": "evaluator", "index": 0}
})

মূল্যায়নের জন্য উপরের "TF_CONFIG" স্ট্রিংয়ের "ক্লাস্টার" অংশটি isচ্ছিক।

আপনি যদি সমস্ত কাজের জন্য একই বাইনারি ব্যবহার করেন

আপনি যদি একক বাইনারি ব্যবহার করে এই সমস্ত কাজ পরিচালনা করতে পছন্দ করেন তবে আপনার প্রোগ্রামের শাখাকে একেবারে শুরুতে বিভিন্ন ভূমিকাতে দেওয়া দরকার:

cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if cluster_resolver.task_type in ("worker", "ps"):
  # start a TensorFlow server and wait.
elif cluster_resolver.task_type == "evaluator":
  # run side-car evaluation
else:
  # run the coordinator.

নিম্নলিখিত কোডটি একটি টেনসরফ্লো সার্ভার শুরু করে এবং অপেক্ষা করে:

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

server = tf.distribute.Server(
    cluster_resolver.cluster_spec(),
    job_name=cluster_resolver.task_type,
    task_index=cluster_resolver.task_id,
    protocol=cluster_resolver.rpc_layer or "grpc",
    start=True)
server.join()

হ্যান্ডলিং টাস্ক ব্যর্থতা

শ্রমিকের ব্যর্থতা

ClusterCoordinator বা Model.fit শ্রমিকদের ব্যর্থতার জন্য অন্তর্নির্মিত ত্রুটি সহিষ্ণুতা সরবরাহ করে। কর্মী পুনরুদ্ধার করার পরে, পূর্বে ডেটা সেটটি ফাংশন প্রদান করা (হয় করতে create_per_worker_dataset সিটিএল, অথবা DatasetCreator জন্য Model.fit ডেটাসেট সৃষ্টি পুনরায়) শ্রমিকের উপর প্রার্থনা করা হবে না।

প্যারামিটার সার্ভার বা সমন্বয়কারী ব্যর্থতা

যাইহোক, যখন সমন্বয়কারী একটি প্যারামিটার সার্ভার ত্রুটি দেখেন, এটি একটি উঠাবে UnavailableError বা AbortedError অবিলম্বে। আপনি এক্ষেত্রে সমন্বয়কারী পুনরায় চালু করতে পারেন। সমন্বয়কারী নিজেও অনুপলব্ধ হতে পারে। অতএব, প্রশিক্ষণের অগ্রগতিটি হারাতে না দেওয়ার জন্য নির্দিষ্ট সরঞ্জামদানের প্রস্তাব দেওয়া হচ্ছে:

  • Model.fit জন্য আপনার BackupAndRestore কলব্যাক ব্যবহার করা উচিত, যা স্বয়ংক্রিয়ভাবে অগ্রগতি সংরক্ষণ এবং পুনরুদ্ধার পরিচালনা করে। উদাহরণের জন্য উপরের কলব্যাকস এবং প্রশিক্ষণ বিভাগটি দেখুন।

  • সিটিএলগুলির জন্য, প্রশিক্ষণ শুরুর আগে আপনার নিয়মিতভাবে মডেল ভেরিয়েবলগুলি পরীক্ষা করা উচিত এবং প্রশিক্ষণ শুরুর আগে একটি চেকপয়েন্ট থেকে মডেল ভেরিয়েবলগুলি লোড করা উচিত। প্রশিক্ষক অগ্রগতিটি অপ্টিমাইজারের চেকপয়েন্ট থাকলেই optimizer.iterations থেকে অনুমান করা যেতে পারে:

checkpoint_manager = tf.train.CheckpointManager(
    tf.train.Checkpoint(model=model, optimizer=optimizer),
    checkpoint_dir,
    max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
  checkpoint = checkpoint_manager.checkpoint
  checkpoint.restore(
      checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()

global_steps = int(optimizer.iterations.numpy())
starting_epoch = global_steps // steps_per_epoch

for _ in range(starting_epoch, num_epoches):
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  coordinator.join()
  checkpoint_manager.save()

একটি RemoteValue

একটি ফাংশন সফলভাবে সম্পাদন করা হলে একটি RemoteValue ক্ষেত্রে গ্যারান্টি দেওয়া হয়। এটি কারণ কারণ বর্তমানে কোনও ফাংশন কার্যকর হওয়ার পরে রিটার্ন মানটি সমন্বয়কের কাছে অনুলিপি করা হয়। অনুলিপি চলাকালীন যদি কোনও শ্রমিকের ব্যর্থতা থাকে তবে ফাংশনটি অন্য উপলব্ধ কর্মীর কাছে আবার চেষ্টা করা হবে। অতএব, আপনি যদি পারফরম্যান্সের জন্য অনুকূল করতে চান তবে আপনি কোনও রিটার্ন মান ছাড়াই ফাংশনগুলি নির্ধারণ করতে পারেন।

ভূল প্রতিবেদন

একবার সমন্বয়কারী হিসাবে যেমন একটি ত্রুটি দেখেন UnavailableError যেমন একটি হিসাবে পরামিতি সার্ভার ও অন্যান্য অ্যাপ্লিকেশান ত্রুটিগুলি থেকে InvalidArgument থেকে tf.debugging.check_numerics , এটা ত্রুটি উত্থাপন আগে সব মুলতুবি আছে ও সারিবদ্ধ ফাংশন বাতিল করবেন। তাদের সম্পর্কিত RemoteValue CancelledError RemoteValue বাড়িয়ে RemoteValue

ত্রুটি উত্থাপিত হওয়ার পরে, সমন্বয়কারী বাতিল ফাংশনগুলি থেকে একই ত্রুটি বা কোনও ত্রুটি বাড়িয়ে তুলবে না।

কর্মক্ষমতা বৃদ্ধি

ParameterServerStrategy ClusterResolver এবং ClusterResolver দিয়ে প্রশিক্ষণ দেওয়ার সময় আপনি পারফরম্যান্সের সমস্যাগুলি দেখতে পান এমন বেশ কয়েকটি সম্ভাব্য কারণ ClusterResolver

একটি সাধারণ কারণ হ'ল প্যারামিটার সার্ভারগুলিতে ভারসাম্যহীন ভার রয়েছে এবং কিছু ভারী-লোড প্যারামিটার সার্ভারের দক্ষতা পৌঁছেছে। একাধিক মূল কারণও থাকতে পারে। এই সমস্যাটি প্রশমিত করার কয়েকটি সহজ পদ্ধতি হ'ল

  1. একটি নির্দিষ্ট মাধ্যমে আপনার বৃহৎ মডেল ভেরিয়েবল ঠিকরা variable_partitioner যখন একটি নির্মাণের ParameterServerStrategy
  2. হটস্পট ভেরিয়েবল তৈরি করা এড়িয়ে চলুন যা সম্ভব হলে একক পদক্ষেপে সমস্ত পরামিতি সার্ভারের দ্বারা প্রয়োজনীয়। উদাহরণস্বরূপ, ডিফল্ট আচরণের কারণে অপ্টিমাইজারগুলিতে একটি ধ্রুবক শেখার হার বা সাবক্লাস tf.keras.optimizers.schedules.LearningRateSchedule ব্যবহার করুন যে শিক্ষার হার একটি নির্দিষ্ট প্যারামিটার সার্ভারে রাখা যায় এবং প্রতিটি পদক্ষেপে সমস্ত অন্যান্য প্যারামিটার সার্ভার দ্বারা অনুরোধ করা হয় ।
  3. আপনার বড় শব্দভান্ডারগুলি কেরাস প্রিপ্রোসেসিং স্তরগুলিতে দেওয়ার আগে এলোমেলো করুন।

পারফরম্যান্স সমস্যাগুলির জন্য আর একটি সম্ভাব্য কারণ হলেন সমন্বয়কারী। আমাদের schedule / join প্রথম প্রয়োগটি পাইথন-ভিত্তিক এবং এভাবে থ্রেডিং ওভারহেড থাকতে পারে। সমন্বয়কারী এবং কর্মীদের মধ্যে বিলম্বিতা বড় হতে পারে। এই যদি হয় তাহলে,

  • Model.fit , আপনি steps_per_execution এ প্রদত্ত steps_per_execution আর্গুমেন্টটি 1 এর চেয়ে বড় একটি Model.compile সেট করতে পারেন।

  • সিটিএল-এর জন্য, আপনি একক tf.function একাধিক পদক্ষেপ প্যাক করতে পারেন:

steps_per_invocation = 10
@tf.function
def step_fn(iterator):
  for _ in range(steps_per_invocation):
    features, labels = next(iterator)
    def replica_fn(features, labels):
      ...

    strategy.run(replica_fn, args=(features, labels))

আমরা লাইব্রেরিটি অপ্টিমাইজ করতে থাকি, আমরা আশা করি বেশিরভাগ ব্যবহারকারীদের ভবিষ্যতে ম্যানুয়ালি পদক্ষেপগুলি প্যাক করতে হবে না।

তদ্ব্যতীত, কার্যকারিতা উন্নতির জন্য একটি ছোট কৌশলটি উপরের হ্যান্ডলিং টাস্ক ব্যর্থতার বিভাগে বর্ণিত হিসাবে কোনও রিটার্ন মান ছাড়াই ফাংশনগুলি নির্ধারণ করে।

সীমাবদ্ধতা জ্ঞাত

বেশিরভাগ জ্ঞাত সীমাবদ্ধতা উপরের বিভাগগুলিতে আচ্ছাদিত। এই বিভাগটি একটি সংক্ষিপ্তসার সরবরাহ করে।

ParameterServerStrategy সাধারণ

  • os.environment["grpc_fail_fast"]="use_caller" ত্রুটি সহিষ্ণুতা সঠিকভাবে কাজ করার জন্য সমন্বয়কারী সহ প্রতিটি কাজে os.environment["grpc_fail_fast"]="use_caller" দরকার।
  • সিঙ্ক্রোনাস প্যারামিটার সার্ভার প্রশিক্ষণ সমর্থিত নয়।
  • অনুকূল কর্মক্ষমতা অর্জনের জন্য সাধারণত একক ফাংশনে একাধিক পদক্ষেপ প্যাক করা প্রয়োজন to
  • এটা তোলে মাধ্যমে একটি saved_model লোড করতে সমর্থিত নয় tf.saved_model.load sharded ভেরিয়েবল রয়েছে। নোট টেনসরফ্লো পরিবেশন ব্যবহার করে যেমন একটি সংরক্ষিত_মাডেল লোড করা কাজ করবে বলে আশা করা হচ্ছে।
  • শারডযুক্ত অপ্টিমাইজার স্লট ভেরিয়েবলকে বিভিন্ন সংখ্যার শারডে সংযুক্ত করে একটি চেকপয়েন্ট লোড করার জন্য এটি সমর্থন করে না।
  • সমন্বয়কারী টাস্কটি আরম্ভ না করে প্যারামিটার সার্ভার ব্যর্থতা থেকে পুনরুদ্ধার করতে এটি সমর্থিত নয়।

Model.fit নির্দিষ্টকরণ

  • steps_per_epoch যুক্তি প্রয়োজন বোধ করা হয় Model.fit । আপনি একটি মান নির্বাচন করতে পারেন যা একটি যুগের মধ্যে উপযুক্ত বিরতি সরবরাহ করে।
  • ParameterServerStrategy সার্ভারস্ট্রেটগির কাস্টম কলব্যাকগুলির পক্ষে সমর্থন নেই যা পারফরম্যান্সের কারণে ব্যাচ-স্তর কল করে। আপনার সেই কলগুলি যথাযথভাবে বাছাই করা steps_per_epoch সাথে যুগের স্তরের কলগুলিতে রূপান্তর করা উচিত, যাতে তাদের প্রতিটি steps_per_epoch সংখ্যা ধাপে ডাকা হয়। অন্তর্নির্মিত কলব্যাকগুলি প্রভাবিত হয় না: তাদের ব্যাচ-স্তর কলগুলি পারফরম্যান্ট হিসাবে সংশোধন করা হয়েছে। ParameterServerStrategy জন্য ব্যাচ-স্তরের কলগুলিকে সমর্থন করার পরিকল্পনা করা হচ্ছে।
  • একই কারণে, অন্যান্য কৌশলগুলির বিপরীতে, অগ্রগতি বার এবং মেট্রিকগুলি কেবল যুগের সীমানায় লগ হয়।
  • Model.fit জন্য ইনপুট কেবলমাত্র DatasetCreator টাইপ DatasetCreator
  • run_eagerly সমর্থিত নয়।
  • Model.fit মূল্যায়ন এখনও সমর্থিত নয়। এটি অন্যতম অগ্রাধিকার।
  • Model.evaluate এবং Model.predict এখনও সমর্থিত নয়।

কাস্টম প্রশিক্ষণ লুপ নির্দিষ্টকরণ