Google I/O-তে TensorFlow-এ যোগ দিন, 11-12 মে এখনই নিবন্ধন করুন

প্যারামিটার সার্ভার স্ট্র্যাটেজি সহ প্যারামিটার সার্ভার প্রশিক্ষণ

TensorFlow.org এ দেখুন Google Colab-এ চালান GitHub-এ উৎস দেখুন নোটবুক ডাউনলোড করুন

ওভারভিউ

প্যারামিটার সার্ভার প্রশিক্ষণ একাধিক মেশিনে মডেল প্রশিক্ষণ স্কেল আপ করার জন্য একটি সাধারণ ডেটা-সমান্তরাল পদ্ধতি।

একটি প্যারামিটার সার্ভার প্রশিক্ষণ ক্লাস্টারে কর্মী এবং পরামিতি সার্ভার থাকে । ভেরিয়েবলগুলি প্যারামিটার সার্ভারে তৈরি করা হয় এবং সেগুলি প্রতিটি ধাপে কর্মীদের দ্বারা পড়া এবং আপডেট করা হয়। ডিফল্টরূপে, কর্মীরা একে অপরের সাথে সিঙ্ক্রোনাইজ না করে স্বাধীনভাবে এই ভেরিয়েবলগুলি পড়ে এবং আপডেট করে। এই কারণেই কখনও কখনও প্যারামিটার সার্ভার-স্টাইল প্রশিক্ষণকে অ্যাসিঙ্ক্রোনাস প্রশিক্ষণ বলা হয়।

TensorFlow 2-এ, প্যারামিটার সার্ভার প্রশিক্ষণ tf.distribute.experimental.ParameterServerStrategy ক্লাস দ্বারা চালিত হয়, যা প্রশিক্ষণের ধাপগুলিকে একটি ক্লাস্টারে বিতরণ করে যা হাজার হাজার কর্মী (প্যারামিটার সার্ভার সহ) পর্যন্ত স্কেল করে।

সমর্থিত প্রশিক্ষণ পদ্ধতি

দুটি প্রধান সমর্থিত প্রশিক্ষণ পদ্ধতি আছে:

কাজ এবং কাজ সহ একটি ক্লাস্টার

পছন্দের API ( Model.fit বা একটি কাস্টম ট্রেনিং লুপ) যাই হোক না কেন, TensorFlow 2-এ বিতরণ করা প্রশিক্ষণ জড়িত: বেশ কয়েকটি 'jobs' সহ একটি 'cluster' , এবং প্রতিটি কাজের এক বা একাধিক 'tasks' থাকতে পারে।

প্যারামিটার সার্ভার প্রশিক্ষণ ব্যবহার করার সময়, এটি সুপারিশ করা হয়:

  • একজন সমন্বয়কারীর কাজ (যার কাজের নাম chief আছে)
  • একাধিক কর্মীর কাজ (চাকরীর নাম worker ); এবং
  • একাধিক প্যারামিটার সার্ভার কাজ (চাকরীর নাম ps )

যখন সমন্বয়কারী সংস্থান তৈরি করে, প্রশিক্ষণের কাজগুলি প্রেরণ করে, চেকপয়েন্টগুলি লেখে এবং টাস্ক ব্যর্থতার সাথে কাজ করে, তখন কর্মী এবং প্যারামিটার সার্ভারগুলি tf.distribute.Server চালায় যেগুলি সমন্বয়কারীর অনুরোধগুলি শোনে।

Model.fit API সহ প্যারামিটার সার্ভার প্রশিক্ষণ

Model.fit API-এর সাথে প্যারামিটার সার্ভার প্রশিক্ষণের জন্য সমন্বয়কারীকে একটি tf.distribute.experimental.ParameterServerStrategy অবজেক্ট এবং একটি tf.keras.utils.experimental.DatasetCreator ইনপুট হিসেবে ব্যবহার করতে হবে। কোন কৌশল ছাড়া Model.fit ব্যবহারের অনুরূপ, বা অন্যান্য কৌশলগুলির সাথে, কর্মপ্রবাহে মডেল তৈরি এবং কম্পাইল করা, কলব্যাক প্রস্তুত করা এবং একটি Model.fit কল অনুসরণ করা জড়িত।

একটি কাস্টম প্রশিক্ষণ লুপ সহ প্যারামিটার সার্ভার প্রশিক্ষণ

কাস্টম প্রশিক্ষণ লুপগুলির সাথে, tf.distribute.experimental.coordinator.ClusterCoordinator ক্লাসটি সমন্বয়কারীর জন্য ব্যবহৃত মূল উপাদান।

  • ClusterCoordinator শ্রেণীকে একটি tf.distribute.Strategy অবজেক্টের সাথে একত্রে কাজ করতে হবে।
  • এই tf.distribute.Strategy অবজেক্টটি ক্লাস্টারের তথ্য প্রদানের জন্য প্রয়োজন এবং একটি প্রশিক্ষণের ধাপ সংজ্ঞায়িত করতে ব্যবহৃত হয়, যেমনটি tf.distribute.Strategy-এর সাথে কাস্টম প্রশিক্ষণে প্রদর্শিত হয়েছে।
  • ClusterCoordinator অবজেক্ট তারপর এই প্রশিক্ষণের পদক্ষেপগুলি দূরবর্তী কর্মীদের কাছে প্রেরণ করে।
  • প্যারামিটার সার্ভার প্রশিক্ষণের জন্য, ক্লাস্টার tf.distribute.experimental.ParameterServerStrategy ClusterCoordinator সাথে কাজ করতে হবে।

ClusterCoordinator অবজেক্ট দ্বারা প্রদত্ত সবচেয়ে গুরুত্বপূর্ণ API হল schedule :

  • schedule API একটি tf.function করে এবং অবিলম্বে ভবিষ্যতের মতো RemoteValue করে।
  • সারিবদ্ধ ফাংশনগুলি পটভূমির থ্রেডগুলিতে দূরবর্তী কর্মীদের কাছে প্রেরণ করা হবে এবং তাদের RemoteValue গুলি অ্যাসিঙ্ক্রোনাসভাবে পূরণ করা হবে।
  • যেহেতু schedule জন্য কর্মী নিয়োগের প্রয়োজন হয় না, তাই পাস করা tf.function যেকোনো উপলব্ধ কর্মীর উপর কার্যকর করা যেতে পারে।
  • যে কর্মীটির উপর এটি কার্যকর করা হয়েছে সেটি সম্পূর্ণ হওয়ার আগে অনুপলব্ধ হলে, অন্য উপলব্ধ কর্মীর উপর ফাংশনটি পুনরায় চেষ্টা করা হবে।
  • এই সত্যের কারণে এবং ফাংশন এক্সিকিউশন পারমাণবিক নয়, একটি ফাংশন একাধিকবার কার্যকর করা যেতে পারে।

দূরবর্তী ফাংশন প্রেরণের পাশাপাশি, ClusterCoordinator সমস্ত কর্মীদের ডেটাসেট তৈরি করতে এবং একজন কর্মী ব্যর্থতা থেকে পুনরুদ্ধার করলে এই ডেটাসেটগুলি পুনর্নির্মাণ করতে সহায়তা করে।

টিউটোরিয়াল সেটআপ

টিউটোরিয়ালটি Model.fit এবং কাস্টম প্রশিক্ষণ লুপ পাথগুলিতে শাখা হবে এবং আপনি আপনার প্রয়োজনের সাথে মানানসই একটি বেছে নিতে পারেন। "X এর সাথে প্রশিক্ষণ" ছাড়া অন্য বিভাগগুলি উভয় পথের জন্য প্রযোজ্য।

pip install portpicker

ক্লাস্টার সেটআপ

উপরে উল্লিখিত হিসাবে, একটি প্যারামিটার সার্ভার প্রশিক্ষণ ক্লাস্টারের জন্য একটি সমন্বয়কারীর টাস্ক প্রয়োজন যা আপনার প্রশিক্ষণ প্রোগ্রাম চালায়, এক বা একাধিক কর্মী এবং প্যারামিটার সার্ভারের টাস্ক যা TensorFlow সার্ভার চালায়— tf.distribute.Server — এবং সম্ভবত একটি অতিরিক্ত মূল্যায়ন টাস্ক যা সাইড-কার মূল্যায়ন চালায়। (নীচে সাইড-কার মূল্যায়ন বিভাগটি দেখুন)। তাদের সেট আপ করার প্রয়োজনীয়তা হল:

  • সমন্বয়কারীর কাজটিকে মূল্যায়নকারী ছাড়া অন্য সব TensorFlow সার্ভারের ঠিকানা এবং পোর্ট জানতে হবে।
  • কর্মী এবং পরামিতি সার্ভারগুলি তাদের কোন পোর্ট শুনতে হবে তা জানতে হবে। সরলতার জন্য, আপনি সাধারণত এই কাজগুলিতে টেনসরফ্লো সার্ভার তৈরি করার সময় সম্পূর্ণ ক্লাস্টার তথ্য পাস করতে পারেন।
  • মূল্যায়নকারীর কাজকে প্রশিক্ষণ ক্লাস্টারের সেটআপ জানতে হবে না। যদি তা হয়, তাহলে প্রশিক্ষণ ক্লাস্টারের সাথে সংযোগ করার চেষ্টা করা উচিত নয়।
  • কর্মী এবং প্যারামিটার সার্ভারের যথাক্রমে "worker" এবং "ps" হিসাবে টাস্কের ধরন থাকা উচিত। কোঅর্ডিনেটরকে উত্তরাধিকারের কারণে টাস্ক টাইপ হিসাবে "chief" ব্যবহার করা উচিত।

এই টিউটোরিয়ালে, আপনি একটি ইন-প্রসেস ক্লাস্টার তৈরি করবেন যাতে পুরো প্যারামিটার সার্ভার প্রশিক্ষণ Colab-এ চালানো যায়। আপনি পরবর্তী বিভাগে কীভাবে আসল ক্লাস্টার সেট আপ করবেন তা শিখবেন।

ইন-প্রসেস ক্লাস্টার

আপনি আগে থেকেই বেশ কয়েকটি টেনসরফ্লো সার্ভার তৈরি করে শুরু করবেন এবং পরে তাদের সাথে সংযুক্ত হবেন। মনে রাখবেন যে এটি শুধুমাত্র এই টিউটোরিয়ালের প্রদর্শনের উদ্দেশ্যে, এবং প্রকৃত প্রশিক্ষণে সার্ভারগুলি "worker" এবং "ps" মেশিনে চালু করা হবে।

def create_in_process_cluster(num_workers, num_ps):
  """Creates and starts local servers and returns the cluster_resolver."""
  worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
  ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]

  cluster_dict = {}
  cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
  if num_ps > 0:
    cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]

  cluster_spec = tf.train.ClusterSpec(cluster_dict)

  # Workers need some inter_ops threads to work properly.
  worker_config = tf.compat.v1.ConfigProto()
  if multiprocessing.cpu_count() < num_workers + 1:
    worker_config.inter_op_parallelism_threads = num_workers + 1

  for i in range(num_workers):
    tf.distribute.Server(
        cluster_spec,
        job_name="worker",
        task_index=i,
        config=worker_config,
        protocol="grpc")

  for i in range(num_ps):
    tf.distribute.Server(
        cluster_spec,
        job_name="ps",
        task_index=i,
        protocol="grpc")

  cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
      cluster_spec, rpc_layer="grpc")
  return cluster_resolver

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)

ইন-প্রসেস ক্লাস্টার সেটআপ প্রায়শই ইউনিট পরীক্ষায় ব্যবহৃত হয়, যেমন এখানে

স্থানীয় পরীক্ষার জন্য আরেকটি বিকল্প হল স্থানীয় মেশিনে প্রসেস চালু করা—এই পদ্ধতির উদাহরণের জন্য কেরাসের সাথে মাল্টি-কর্মী প্রশিক্ষণ দেখুন

একটি প্যারামিটার সার্ভার স্ট্র্যাটেজি চালু করুন

আপনি প্রশিক্ষণ কোডে ডুব দেওয়ার আগে, আসুন একটি ParameterServerStrategy অবজেক্টকে ইনস্ট্যান্টিয়েট করি। মনে রাখবেন যে আপনি Model.fit বা একটি কাস্টম প্রশিক্ষণ লুপের সাথে এগিয়ে যাচ্ছেন কিনা তা নির্বিশেষে এটি প্রয়োজন। variable_partitioner আর্গুমেন্টটি পরিবর্তনশীল শার্ডিং বিভাগে ব্যাখ্যা করা হবে।

variable_partitioner = (
    tf.distribute.experimental.partitioners.MinSizePartitioner(
        min_shard_bytes=(256 << 10),
        max_shards=NUM_PS))

strategy = tf.distribute.experimental.ParameterServerStrategy(
    cluster_resolver,
    variable_partitioner=variable_partitioner)
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'ps': ['localhost:19069', 'localhost:19342'], 'worker': ['localhost:18007', 'localhost:20252', 'localhost:23613']})
INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'ps': ['localhost:19069', 'localhost:19342'], 'worker': ['localhost:18007', 'localhost:20252', 'localhost:23613']})
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:chief/replica:0/task:0/device:GPU:0'], variable_device = '/job:chief/replica:0/task:0/device:GPU:0'
INFO:tensorflow:Number of GPUs on workers: 1

প্রশিক্ষণের জন্য GPUs ব্যবহার করার জন্য, প্রতিটি কর্মীর জন্য দৃশ্যমান GPU বরাদ্দ করুন। ParameterServerStrategy সার্ভার স্ট্র্যাটেজি প্রতিটি কর্মীর জন্য সমস্ত উপলব্ধ জিপিইউ ব্যবহার করবে, এই সীমাবদ্ধতার সাথে যে সমস্ত কর্মীদের একই সংখ্যক জিপিইউ উপলব্ধ থাকতে হবে।

পরিবর্তনশীল শার্ডিং

ভেরিয়েবল শার্ডিং বলতে বোঝায় একটি ভেরিয়েবলকে একাধিক ছোট ভেরিয়েবলে বিভক্ত করা, যাকে শার্ড বলা হয়। এই শার্ডগুলি অ্যাক্সেস করার সময় নেটওয়ার্ক লোড বিতরণ করার জন্য পরিবর্তনশীল শার্ডিং কার্যকর হতে পারে। এটি একাধিক প্যারামিটার সার্ভার জুড়ে একটি সাধারণ ভেরিয়েবলের গণনা এবং সঞ্চয়স্থান বিতরণ করতেও দরকারী।

পরিবর্তনশীল শার্ডিং সক্ষম করতে, আপনি একটি ParameterServerStrategy সার্ভার স্ট্র্যাটেজি অবজেক্ট তৈরি করার সময় একটি variable_partitioner এ পাস করতে পারেন। যখন একটি ভেরিয়েবল তৈরি করা হয় তখন প্রতিবার variable_partitioner আহ্বান করা হবে এবং এটি ভেরিয়েবলের প্রতিটি ডাইমেনশন বরাবর শার্ডের সংখ্যা ফেরত দেবে বলে আশা করা হয়। কিছু আউট-অফ-বক্স variable_partitioner প্রদান করা হয় যেমন tf.distribute.experimental.partitioners.MinSizePartitioner । ছোট ভেরিয়েবলের বিভাজন এড়াতে tf.distribute.experimental.partitioners.MinSizePartitioner এর মতো আকার-ভিত্তিক পার্টিশনার ব্যবহার করার পরামর্শ দেওয়া হয়, যা মডেল প্রশিক্ষণের গতিতে নেতিবাচক প্রভাব ফেলতে পারে।

যখন একটি variable_partitioner পাস করা হয় এবং আপনি যদি সরাসরি strategy.scope() এর অধীনে একটি ভেরিয়েবল তৈরি করেন, এটি একটি variables বৈশিষ্ট্য সহ একটি ধারক প্রকারে পরিণত হবে যা শার্ডগুলির তালিকায় অ্যাক্সেস প্রদান করে। বেশিরভাগ ক্ষেত্রে, এই ধারকটি স্বয়ংক্রিয়ভাবে সমস্ত শার্ডগুলিকে একত্রিত করে একটি টেনসরে রূপান্তরিত হবে৷ ফলস্বরূপ, এটি একটি সাধারণ পরিবর্তনশীল হিসাবে ব্যবহার করা যেতে পারে। অন্যদিকে, কিছু TensorFlow পদ্ধতি যেমন tf.nn.embedding_lookup এই কন্টেইনার প্রকারের জন্য দক্ষ বাস্তবায়ন প্রদান করে এবং এই পদ্ধতিতে স্বয়ংক্রিয় সংযোজন এড়ানো হবে।

আরো বিস্তারিত জানার জন্য tf.distribute.experimental.ParameterServerStrategy এর API ডক্স দেখুন।

Model.fit সাথে প্রশিক্ষণ

Model.fit এর মাধ্যমে একটি সহজে-ব্যবহারযোগ্য প্রশিক্ষণ API প্রদান করে যা হুডের নিচে প্রশিক্ষণ লুপ পরিচালনা করে, ওভাররিডেবল train_step , এবং কলব্যাকের নমনীয়তা সহ, যা টেনসরবোর্ডের জন্য চেকপয়েন্ট সংরক্ষণ বা সারাংশ সংরক্ষণের মতো কার্যকারিতা প্রদান করে। Model.fit এর সাথে, কৌশল অবজেক্টের একটি সাধারণ অদলবদল সহ অন্যান্য কৌশলগুলির জন্য একই প্রশিক্ষণ কোড ব্যবহার করা যেতে পারে।

তথ্য অন্তর্ভুক্তী

প্যারামিটার সার্ভার প্রশিক্ষণের সাথে Model.fit এর জন্য ইনপুট ডেটা একটি কলেবলে সরবরাহ করা প্রয়োজন যা tf.distribute.InputContext টাইপের একটি একক আর্গুমেন্ট নেয় এবং একটি tf.data.Dataset । তারপর, একটি tf.keras.utils.experimental.DatasetCreator অবজেক্ট তৈরি করুন যা এই ধরনের callable এবং একটি ঐচ্ছিক tf.distribute.InputOptions অবজেক্ট input_options আর্গুমেন্টের মাধ্যমে নেয়।

মনে রাখবেন যে প্যারামিটার সার্ভার প্রশিক্ষণের সাথে ডেটা শাফেল এবং পুনরাবৃত্তি করার পরামর্শ দেওয়া হয় এবং fit কলে steps_per_epoch নির্দিষ্ট করুন যাতে লাইব্রেরি যুগের সীমানা জানে৷

ইনপুট InputContext আর্গুমেন্ট সম্পর্কে আরও তথ্যের জন্য অনুগ্রহ করে ডিস্ট্রিবিউটেড ইনপুট টিউটোরিয়াল দেখুন।

def dataset_fn(input_context):
  global_batch_size = 64
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)

  x = tf.random.uniform((10, 10))
  y = tf.random.uniform((10,))

  dataset = tf.data.Dataset.from_tensor_slices((x, y)).shuffle(10).repeat()
  dataset = dataset.shard(
      input_context.num_input_pipelines,
      input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2)

  return dataset

dc = tf.keras.utils.experimental.DatasetCreator(dataset_fn)

dataset_fn এর কোডটি ইনপুট ডিভাইসে, যা সাধারণত CPU হয়, প্রতিটি কর্মী মেশিনে ব্যবহার করা হবে।

মডেল নির্মাণ এবং সংকলন

এখন, আপনি একটি tf.keras.Model তৈরি করবেন —একটি তুচ্ছ tf.keras.models.Sequential . প্রদর্শনের উদ্দেশ্যে অনুক্রমিক মডেল—এর পরে একটি Model.compile কলের মাধ্যমে উপাদানগুলি, যেমন একটি অপ্টিমাইজার, মেট্রিক্স, বা প্যারামিটার যেমন steps_per_execution :

with strategy.scope():
  model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)])

  model.compile(tf.keras.optimizers.SGD(), loss='mse', steps_per_execution=10)

কলব্যাক এবং প্রশিক্ষণ

আপনি প্রকৃত প্রশিক্ষণের জন্য model.fit কল করার আগে, আসুন সাধারণ কাজের জন্য প্রয়োজনীয় কলব্যাকগুলি প্রস্তুত করি, যেমন:

  • ModelCheckpoint : মডেলের ওজন সংরক্ষণ করতে।
  • BackupAndRestore : প্রশিক্ষণের অগ্রগতি স্বয়ংক্রিয়ভাবে ব্যাক আপ করা হয়েছে তা নিশ্চিত করতে এবং ক্লাস্টারটি অনুপলব্ধতা অনুভব করলে পুনরুদ্ধার করা হয় (যেমন গর্ভপাত বা প্রিম্পশন); বা
  • TensorBoard : সারাংশ ফাইলগুলিতে অগ্রগতি প্রতিবেদন সংরক্ষণ করতে, যা টেনসরবোর্ড টুলে ভিজ্যুয়ালাইজ করা হয়।
working_dir = '/tmp/my_working_dir'
log_dir = os.path.join(working_dir, 'log')
ckpt_filepath = os.path.join(working_dir, 'ckpt')
backup_dir = os.path.join(working_dir, 'backup')

callbacks = [
    tf.keras.callbacks.TensorBoard(log_dir=log_dir),
    tf.keras.callbacks.ModelCheckpoint(filepath=ckpt_filepath),
    tf.keras.callbacks.BackupAndRestore(backup_dir=backup_dir),
]

model.fit(dc, epochs=5, steps_per_epoch=20, callbacks=callbacks)
Epoch 1/5
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
2022-01-26 05:32:01.399347: W tensorflow/python/util/util.cc:368] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 4s - loss: 0.5761 - 4s/epoch - 185ms/step
Epoch 2/5
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 1s - loss: 0.4423 - 561ms/epoch - 28ms/step
Epoch 3/5
WARNING:tensorflow:5 out of the last 5 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f89783e7560> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
WARNING:tensorflow:6 out of the last 6 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f897851f050> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
20/20 - 0s - loss: 0.3983 - 392ms/epoch - 20ms/step
Epoch 4/5
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 1s - loss: 0.3592 - 507ms/epoch - 25ms/step
Epoch 5/5
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 0s - loss: 0.2705 - 384ms/epoch - 19ms/step
<keras.callbacks.History at 0x7f89984ca890>

ClusterCoordinator সাথে সরাসরি ব্যবহার (ঐচ্ছিক)

এমনকি যদি আপনি Model.fit প্রশিক্ষণের পথ বেছে নেন, আপনি ঐচ্ছিকভাবে একটি tf.distribute.experimental.coordinator.ClusterCoordinator অবজেক্ট তৈরি করতে পারেন যাতে আপনি কর্মীদের উপর সম্পাদন করতে চান এমন অন্যান্য ফাংশন নির্ধারণ করতে পারেন। আরও বিশদ বিবরণ এবং উদাহরণের জন্য একটি কাস্টম প্রশিক্ষণ লুপ বিভাগ সহ প্রশিক্ষণ দেখুন।

একটি কাস্টম প্রশিক্ষণ লুপ সঙ্গে প্রশিক্ষণ

tf.distribute.Strategy সহ কাস্টম প্রশিক্ষণ লুপ ব্যবহার করা প্রশিক্ষণ tf.distribute.Strategy সংজ্ঞায়িত করার জন্য দুর্দান্ত নমনীয়তা প্রদান করে। উপরে সংজ্ঞায়িত ParameterServerStrategy সার্ভার স্ট্র্যাটেজির সাথে ( strategy হিসাবে), আপনি একটি tf.distribute.experimental.coordinator.ClusterCoordinator ব্যবহার করবেন দূরবর্তী কর্মীদের প্রশিক্ষণের পদক্ষেপগুলি সম্পাদন করার জন্য।

তারপর, আপনি একটি মডেল তৈরি করবেন, একটি ডেটাসেট এবং একটি ধাপ ফাংশন সংজ্ঞায়িত করবেন, যেমন আপনি অন্যান্য tf.distribute.Strategy s-এর সাথে প্রশিক্ষণ লুপে করেছেন। আপনি tf.distribute.Strategy টিউটোরিয়াল সহ কাস্টম প্রশিক্ষণে আরও বিশদ জানতে পারেন।

দক্ষ ডেটাসেট প্রিফেচিং নিশ্চিত করতে, নীচে প্রত্যন্ত কর্মীদের বিভাগে পাঠানোর প্রশিক্ষণের ধাপে উল্লিখিত প্রস্তাবিত বিতরণ করা ডেটাসেট তৈরি API ব্যবহার করুন। এছাড়াও, শ্রমিকদের জন্য বরাদ্দকৃত GPU-এর সম্পূর্ণ সুবিধা নিতে worker_fn এর ভিতরে Strategy.run কল করতে ভুলবেন না। বাকি ধাপগুলো GPU সহ বা ছাড়া প্রশিক্ষণের জন্য একই।

আসুন নিম্নলিখিত ধাপে এই উপাদানগুলি তৈরি করি:

ডেটা সেট আপ করুন

প্রথমে, একটি ফাংশন লিখুন যা একটি ডেটাসেট তৈরি করে যাতে কেরাস প্রিপ্রসেসিং স্তরগুলি দ্বারা বাস্তবায়িত প্রিপ্রসেসিং লজিক অন্তর্ভুক্ত থাকে।

আপনি dataset_fn dataset_fn ভিতরে রূপান্তর প্রয়োগ করবেন, যেহেতু আপনি dataset_fn কে একটি tf.function এ মোড়ানো হবে, যা এর ভিতরে ভেরিয়েবল তৈরি করতে দেয় না।

feature_vocab = [
    "avenger", "ironman", "batman", "hulk", "spiderman", "kingkong", "wonder_woman"
]
label_vocab = ["yes", "no"]

with strategy.scope():
  feature_lookup_layer = tf.keras.layers.StringLookup(
      vocabulary=feature_vocab,
      mask_token=None)
  label_lookup_layer = tf.keras.layers.StringLookup(
      vocabulary=label_vocab,
      num_oov_indices=0,
      mask_token=None)

  raw_feature_input = tf.keras.layers.Input(
      shape=(3,),
      dtype=tf.string,
      name="feature")
  feature_id_input = feature_lookup_layer(raw_feature_input)
  feature_preprocess_stage = tf.keras.Model(
      {"features": raw_feature_input},
      feature_id_input)

  raw_label_input = tf.keras.layers.Input(
      shape=(1,),
      dtype=tf.string,
      name="label")
  label_id_input = label_lookup_layer(raw_label_input)

  label_preprocess_stage = tf.keras.Model(
      {"label": raw_label_input},
      label_id_input)
/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/numpy/core/numeric.py:2446: FutureWarning: elementwise comparison failed; returning scalar instead, but in the future will perform elementwise comparison
  return bool(asarray(a1 == a2).all())

একটি ডেটাসেটে খেলনা উদাহরণ তৈরি করুন:

def feature_and_label_gen(num_examples=200):
  examples = {"features": [], "label": []}
  for _ in range(num_examples):
    features = random.sample(feature_vocab, 3)
    label = ["yes"] if "avenger" in features else ["no"]
    examples["features"].append(features)
    examples["label"].append(label)
  return examples

examples = feature_and_label_gen()

তারপর, একটি dataset_fn এ মোড়ানো প্রশিক্ষণ ডেটাসেট তৈরি করুন:

def dataset_fn(_):
  raw_dataset = tf.data.Dataset.from_tensor_slices(examples)

  train_dataset = raw_dataset.map(
      lambda x: (
          {"features": feature_preprocess_stage(x["features"])},
          label_preprocess_stage(x["label"])
      )).shuffle(200).batch(32).repeat()
  return train_dataset

মডেল তৈরি করুন

এর পরে, মডেল এবং অন্যান্য বস্তু তৈরি করুন। strategy.scope অধীনে সমস্ত ভেরিয়েবল তৈরি করা নিশ্চিত করুন।

# These variables created under the `strategy.scope` will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
  # Create the model. The input needs to be compatible with Keras processing layers.
  model_input = tf.keras.layers.Input(
      shape=(3,), dtype=tf.int64, name="model_input")

  emb_layer = tf.keras.layers.Embedding(
      input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=16384)
  emb_output = tf.reduce_mean(emb_layer(model_input), axis=1)
  dense_output = tf.keras.layers.Dense(units=1, activation="sigmoid")(emb_output)
  model = tf.keras.Model({"features": model_input}, dense_output)

  optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.1)
  accuracy = tf.keras.metrics.Accuracy()

আসুন নিশ্চিত করি যে FixedShardsPartitioner-এর ব্যবহার সমস্ত ভেরিয়েবলকে দুটি FixedShardsPartitioner বিভক্ত করেছে এবং প্রতিটি শার্ড বিভিন্ন প্যারামিটার সার্ভারে বরাদ্দ করা হয়েছে:

assert len(emb_layer.weights) == 2
assert emb_layer.weights[0].shape == (4, 16384)
assert emb_layer.weights[1].shape == (4, 16384)
assert emb_layer.weights[0].device == "/job:ps/replica:0/task:0/device:CPU:0"
assert emb_layer.weights[1].device == "/job:ps/replica:0/task:1/device:CPU:0"

প্রশিক্ষণের ধাপ সংজ্ঞায়িত করুন

তৃতীয়ত, একটি tf.function এ মোড়ানো প্রশিক্ষণের ধাপ তৈরি করুন:

@tf.function
def step_fn(iterator):

  def replica_fn(batch_data, labels):
    with tf.GradientTape() as tape:
      pred = model(batch_data, training=True)
      per_example_loss = tf.keras.losses.BinaryCrossentropy(
              reduction=tf.keras.losses.Reduction.NONE)(labels, pred)
      loss = tf.nn.compute_average_loss(per_example_loss)
      gradients = tape.gradient(loss, model.trainable_variables)

    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    accuracy.update_state(labels, actual_pred)
    return loss

  batch_data, labels = next(iterator)
  losses = strategy.run(replica_fn, args=(batch_data, labels))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)

উপরের ট্রেনিং স্টেপ ফাংশনে, step_fnStrategy.run এবং Strategy.reduce কল করলে প্রতি কর্মী একাধিক GPU সমর্থন করতে পারে। যদি কর্মীদের জিপিইউ বরাদ্দ থাকে, তাহলে Strategy.run একাধিক প্রতিলিপিতে ডেটাসেট বিতরণ করবে।

প্রত্যন্ত কর্মীদের প্রশিক্ষণের পদক্ষেপগুলি প্রেরণ করুন

ParameterServerStrategy সার্ভারস্ট্র্যাটেজি দ্বারা সমস্ত গণনা সংজ্ঞায়িত হওয়ার পরে, আপনি রিসোর্স তৈরি করতে এবং দূরবর্তী কর্মীদের প্রশিক্ষণের পদক্ষেপগুলি বিতরণ করতে tf.distribute.experimental.coordinator.ClusterCoordinator ক্লাস ব্যবহার করবেন।

প্রথমে একটি ClusterCoordinator অবজেক্ট তৈরি করি এবং কৌশল অবজেক্টে পাস করি:

coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)

তারপরে, একটি প্রতি-কর্মী ডেটাসেট এবং একটি পুনরাবৃত্তিকারী তৈরি করুন। নিচের per_worker_dataset_fn এ, dataset_fn কে strategy.distribute_datasets_from_function এ মোড়ানোর পরামর্শ দেওয়া হয় যাতে করে GPU-তে নির্বিঘ্নে দক্ষ প্রিফেচ করা যায়।

@tf.function
def per_worker_dataset_fn():
  return strategy.distribute_datasets_from_function(dataset_fn)

per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).

চূড়ান্ত ধাপ হল ClusterCoordinator.schedule ব্যবহার করে দূরবর্তী কর্মীদের গণনা বিতরণ করা:

  • schedule পদ্ধতি একটি tf.function করে এবং অবিলম্বে ভবিষ্যতের মতো RemoteValue করে। সারিবদ্ধ ফাংশনগুলি পটভূমির থ্রেডগুলিতে দূরবর্তী কর্মীদের কাছে পাঠানো হবে এবং রিমোট RemoteValue অ্যাসিঙ্ক্রোনাসভাবে পূরণ করা হবে।
  • join পদ্ধতি ( ClusterCoordinator.join ) সমস্ত নির্ধারিত ফাংশন কার্যকর না হওয়া পর্যন্ত অপেক্ষা করতে ব্যবহার করা যেতে পারে।
num_epoches = 4
steps_per_epoch = 5
for i in range(num_epoches):
  accuracy.reset_states()
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  # Wait at epoch boundaries.
  coordinator.join()
  print ("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
Finished epoch 0, accuracy is 0.543750.
Finished epoch 1, accuracy is 0.543750.
Finished epoch 2, accuracy is 0.950000.
Finished epoch 3, accuracy is 1.000000.

এখানে আপনি কিভাবে একটি RemoteValue এর ফলাফল আনতে পারেন:

loss = coordinator.schedule(step_fn, args=(per_worker_iterator,))
print ("Final loss is %f" % loss.fetch())
Final loss is 0.000000

বিকল্পভাবে, আপনি সমস্ত পদক্ষেপ চালু করতে পারেন এবং সমাপ্তির জন্য অপেক্ষা করার সময় কিছু করতে পারেন:

for _ in range(total_steps):
  coordinator.schedule(step_fn, args=(per_worker_iterator,))
while not coordinator.done():
  time.sleep(10)
  # Do something like logging metrics or writing checkpoints.

এই বিশেষ উদাহরণের জন্য সম্পূর্ণ প্রশিক্ষণ এবং পরিবেশন কর্মপ্রবাহের জন্য, অনুগ্রহ করে এই পরীক্ষাটি দেখুন।

ডেটাসেট তৈরি সম্পর্কে আরও

উপরের কোডের ClusterCoordinator.create_per_worker_dataset API ব্যবহার করে তৈরি করা হয়েছে। এটি কর্মী প্রতি একটি ডেটাসেট তৈরি করে এবং একটি ধারক বস্তু প্রদান করে। আপনি প্রতি-কর্মী পুনরাবৃত্তিকারী তৈরি করতে এটিতে iter পদ্ধতিতে কল করতে পারেন। প্রতি-কর্মী ইটারেটারে প্রতি কর্মী প্রতি একটি ইটারেটর থাকে এবং একটি নির্দিষ্ট কর্মীর উপর ফাংশনটি কার্যকর করার আগে ClusterCoordinator.schedule পদ্ধতিতে পাস করা ফাংশনের ইনপুট আর্গুমেন্টে একজন কর্মীর অনুরূপ স্লাইস প্রতিস্থাপিত হবে।

বর্তমানে, ClusterCoordinator.schedule পদ্ধতি অনুমান করে যে শ্রমিকরা সমতুল্য এবং এইভাবে ধরে নেয় যে বিভিন্ন কর্মীদের ডেটাসেটগুলি একই, যদি তাদের মধ্যে Dataset.shuffle অপারেশন থাকে তবে সেগুলি ভিন্নভাবে শাফেল হতে পারে। এই কারণে, এটিও সুপারিশ করা হয় যে ডেটাসেটগুলি অনির্দিষ্টকালের জন্য পুনরাবৃত্তি করা হবে এবং আপনি একটি ডেটাসেট থেকে OutOfRangeError এর উপর নির্ভর করার পরিবর্তে একটি সীমিত সংখ্যক পদক্ষেপের সময় নির্ধারণ করুন৷

আরেকটি গুরুত্বপূর্ণ নোট হল যে tf.data ডেটাসেটগুলি টাস্ক সীমানা জুড়ে অন্তর্নিহিত সিরিয়ালাইজেশন এবং ডিসিরিয়ালাইজেশন সমর্থন করে না। তাই ClusterCoordinator.create_per_worker_dataset এ পাস করা ফাংশনের ভিতরে পুরো ডেটাসেট তৈরি করা গুরুত্বপূর্ণ।

মূল্যায়ন

বিতরণ করা প্রশিক্ষণে একটি মূল্যায়ন লুপ সংজ্ঞায়িত এবং চালানোর একাধিক উপায় রয়েছে। নীচে বর্ণিত হিসাবে প্রত্যেকের নিজস্ব সুবিধা এবং অসুবিধা রয়েছে। আপনার পছন্দ না থাকলে ইনলাইন মূল্যায়ন পদ্ধতিটি সুপারিশ করা হয়।

ইনলাইন মূল্যায়ন

এই পদ্ধতিতে, সমন্বয়কারী প্রশিক্ষণ এবং মূল্যায়নের মধ্যে বিকল্প করে এবং এইভাবে একে ইনলাইন মূল্যায়ন বলা হয়।

ইনলাইন মূল্যায়নের বেশ কিছু সুবিধা রয়েছে। উদাহরণ স্বরূপ:

  • এটি বড় মূল্যায়ন মডেল এবং মূল্যায়ন ডেটাসেট সমর্থন করতে পারে যা একটি একক কাজ ধরে রাখতে পারে না।
  • মূল্যায়নের ফলাফল পরবর্তী যুগের প্রশিক্ষণের সিদ্ধান্ত নিতে ব্যবহার করা যেতে পারে।

ইনলাইন মূল্যায়ন বাস্তবায়নের দুটি উপায় রয়েছে: সরাসরি মূল্যায়ন এবং বিতরণ মূল্যায়ন।

  • প্রত্যক্ষ মূল্যায়ন : ছোট মডেল এবং মূল্যায়ন ডেটাসেটের জন্য, সমন্বয়কারী সরাসরি বিতরণকৃত মডেলে সমন্বয়কারীর মূল্যায়ন ডেটাসেটের সাথে মূল্যায়ন চালাতে পারে:
eval_dataset = tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).batch(8)

eval_accuracy = tf.keras.metrics.Accuracy()

for batch_data, labels in eval_dataset:
  pred = model(batch_data, training=False)
  actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
  eval_accuracy.update_state(labels, actual_pred)

print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000
  • বিতরণকৃত মূল্যায়ন : বড় মডেল বা ডেটাসেটগুলির জন্য যেগুলি সরাসরি সমন্বয়কারীতে চালানো অসম্ভব, সমন্বয়কারীর কাজটি ClusterCoordinator.schedule / ClusterCoordinator.join পদ্ধতির মাধ্যমে কর্মীদের মূল্যায়নের কাজগুলি বিতরণ করতে পারে:
with strategy.scope():
  # Define the eval metric on parameter servers.
  eval_accuracy = tf.keras.metrics.Accuracy()

@tf.function
def eval_step(iterator):
  def replica_fn(batch_data, labels):
    pred = model(batch_data, training=False)
    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    eval_accuracy.update_state(labels, actual_pred)
  batch_data, labels = next(iterator)
  strategy.run(replica_fn, args=(batch_data, labels))

def eval_dataset_fn():
  return tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).shuffle(16).repeat().batch(8)

per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn)
per_worker_eval_iterator = iter(per_worker_eval_dataset)

eval_steps_per_epoch = 2
for _ in range(eval_steps_per_epoch):
  coordinator.schedule(eval_step, args=(per_worker_eval_iterator,))
coordinator.join()
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources
WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources
WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources
Evaluation accuracy: 1.000000

সাইড-কার মূল্যায়ন

আরেকটি পদ্ধতিকে সাইড-কার মূল্যায়ন বলা হয় যেখানে আপনি একটি ডেডিকেটেড মূল্যায়নকারী কাজ তৈরি করেন যা বারবার চেকপয়েন্ট পড়ে এবং একটি সর্বশেষ চেকপয়েন্টে মূল্যায়ন চালায়। মূল্যায়ন ফলাফলের উপর ভিত্তি করে আপনার প্রশিক্ষণের লুপ পরিবর্তন করার প্রয়োজন না হলে এটি আপনার প্রশিক্ষণ প্রোগ্রামটি তাড়াতাড়ি শেষ করার অনুমতি দেয়। যাইহোক, এটি মূল্যায়ন ট্রিগার করার জন্য একটি অতিরিক্ত মূল্যায়নকারীর কাজ এবং পর্যায়ক্রমিক চেকপয়েন্টিং প্রয়োজন। নিম্নলিখিত একটি সম্ভাব্য পার্শ্ব-কার মূল্যায়ন লুপ:

checkpoint_dir = ...
eval_model = ...
eval_data = ...
checkpoint = tf.train.Checkpoint(model=eval_model)

for latest_checkpoint in tf.train.checkpoints_iterator(
    checkpoint_dir):
  try:
    checkpoint.restore(latest_checkpoint).expect_partial()
  except (tf.errors.OpError,) as e:
    # checkpoint may be deleted by training when it is about to read it.
    continue

  # Optionally add callbacks to write summaries.
  eval_model.evaluate(eval_data)

  # Evaluation finishes when it has evaluated the last epoch.
  if latest_checkpoint.endswith('-{}'.format(train_epoches)):
    break

বাস্তব বিশ্বের ক্লাস্টার

একটি বাস্তব উত্পাদন পরিবেশে, আপনি বিভিন্ন মেশিনে বিভিন্ন প্রক্রিয়ায় সমস্ত কাজ চালাবেন। প্রতিটি টাস্কে ক্লাস্টার তথ্য কনফিগার করার সবচেয়ে সহজ উপায় হল "TF_CONFIG" পরিবেশ ভেরিয়েবল সেট করা এবং "TF_CONFIG" পার্স করার জন্য একটি tf.distribute.cluster_resolver.TFConfigClusterResolver ব্যবহার করা।

"TF_CONFIG" এনভায়রনমেন্ট ভেরিয়েবল সম্পর্কে একটি সাধারণ বর্ণনার জন্য, ডিস্ট্রিবিউটেড ট্রেনিং গাইড দেখুন।

আপনি যদি Kubernetes বা অন্যান্য কনফিগারেশন টেমপ্লেটগুলি ব্যবহার করে আপনার প্রশিক্ষণের কাজগুলি শুরু করেন, তাহলে খুব সম্ভবত এই টেমপ্লেটগুলি ইতিমধ্যেই আপনার জন্য “TF_CONFIG" সেট করেছে৷

"TF_CONFIG" পরিবেশ পরিবর্তনশীল সেট করুন

ধরুন আপনার 3 জন কর্মী এবং 2টি প্যারামিটার সার্ভার আছে, কর্মী 1 এর "TF_CONFIG" হতে পারে:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"],
        "chief": ["host6:port"]
    },
    "task": {"type": "worker", "index": 1}
})

মূল্যায়নকারীর "TF_CONFIG" হতে পারে:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "evaluator": ["host7:port"]
    },
    "task": {"type": "evaluator", "index": 0}
})

মূল্যায়নকারীর জন্য উপরের "TF_CONFIG" স্ট্রিংয়ের "cluster" অংশটি ঐচ্ছিক।

আপনি যদি সমস্ত কাজের জন্য একই বাইনারি ব্যবহার করেন

আপনি যদি একটি একক বাইনারি ব্যবহার করে এই সমস্ত কাজগুলি চালাতে পছন্দ করেন, তাহলে আপনাকে আপনার প্রোগ্রামের শাখাটিকে একেবারে শুরুতে বিভিন্ন ভূমিকাতে দিতে হবে:

cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if cluster_resolver.task_type in ("worker", "ps"):
  # Start a TensorFlow server and wait.
elif cluster_resolver.task_type == "evaluator":
  # Run side-car evaluation
else:
  # Run the coordinator.

নিম্নলিখিত কোডটি একটি টেনসরফ্লো সার্ভার শুরু করে এবং অপেক্ষা করে:

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

server = tf.distribute.Server(
    cluster_resolver.cluster_spec(),
    job_name=cluster_resolver.task_type,
    task_index=cluster_resolver.task_id,
    protocol=cluster_resolver.rpc_layer or "grpc",
    start=True)
server.join()

হ্যান্ডলিং টাস্ক ব্যর্থতা

কর্মীর ব্যর্থতা

tf.distribute.experimental.coordinator.ClusterCoordinator বা Model.fit কর্মীদের ব্যর্থতার জন্য বিল্ট-ইন ফল্ট টলারেন্স প্রদান করে। কর্মী পুনরুদ্ধার করার পরে, পূর্বে প্রদত্ত ডেটাসেট ফাংশন (হয় একটি কাস্টম প্রশিক্ষণ লুপের জন্য ClusterCoordinator.create_per_worker_dataset , অথবা tf.keras.utils.experimental.DatasetCreator for Model.fit ) ডেটাসেটগুলি পুনরায় তৈরি করার জন্য কর্মীদের কাছে আহ্বান করা হবে৷

প্যারামিটার সার্ভার বা সমন্বয়কারী ব্যর্থতা

যাইহোক, যখন সমন্বয়কারী একটি প্যারামিটার সার্ভার ত্রুটি দেখে, তখন এটি অবিলম্বে একটি UnavailableError বা AbortedError উত্থাপন করবে। আপনি এই ক্ষেত্রে সমন্বয়কারী পুনরায় চালু করতে পারেন। সমন্বয়কারী নিজেও অনুপলব্ধ হতে পারে। অতএব, প্রশিক্ষণের অগ্রগতি না হারানোর জন্য নির্দিষ্ট টুলিংয়ের সুপারিশ করা হয়:

  • Model.fit এর জন্য, আপনার একটি BackupAndRestore কলব্যাক ব্যবহার করা উচিত, যা স্বয়ংক্রিয়ভাবে অগ্রগতি সংরক্ষণ এবং পুনরুদ্ধার পরিচালনা করে। একটি উদাহরণের জন্য উপরে কলব্যাক এবং প্রশিক্ষণ বিভাগ দেখুন।

  • একটি কাস্টম ট্রেনিং লুপের জন্য, আপনাকে পর্যায়ক্রমে মডেল ভেরিয়েবল চেকপয়েন্ট করা উচিত এবং ট্রেনিং শুরু হওয়ার আগে একটি চেকপয়েন্ট থেকে মডেল ভেরিয়েবল লোড করা উচিত। প্রশিক্ষণের অগ্রগতি প্রায় optimizer.iterations থেকে অনুমান করা যেতে পারে যদি একটি অপ্টিমাইজার চেকপয়েন্ট করা হয়:

checkpoint_manager = tf.train.CheckpointManager(
    tf.train.Checkpoint(model=model, optimizer=optimizer),
    checkpoint_dir,
    max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
  checkpoint = checkpoint_manager.checkpoint
  checkpoint.restore(
      checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()

global_steps = int(optimizer.iterations.numpy())
starting_epoch = global_steps // steps_per_epoch

for _ in range(starting_epoch, num_epoches):
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  coordinator.join()
  checkpoint_manager.save()

একটি RemoteValue হচ্ছে

একটি ফাংশন সফলভাবে সম্পাদিত হলে একটি RemoteValue আনয়ন নিশ্চিত করা হয়। কারণ বর্তমানে একটি ফাংশন কার্যকর করার পর রিটার্ন মানটি সমন্বয়কারীর কাছে অবিলম্বে অনুলিপি করা হয়। অনুলিপি চলাকালীন কোন কর্মী ব্যর্থ হলে, ফাংশনটি অন্য উপলব্ধ কর্মীর উপর পুনরায় চেষ্টা করা হবে। অতএব, আপনি যদি পারফরম্যান্সের জন্য অপ্টিমাইজ করতে চান তবে আপনি রিটার্ন মান ছাড়াই ফাংশনগুলি নির্ধারণ করতে পারেন।

ভূল প্রতিবেদন

একবার সমন্বয়কারী প্যারামিটার সার্ভার থেকে UnavailableError বা InvalidArgument থেকে একটি InvalidArgument এর মতো অন্যান্য অ্যাপ্লিকেশন ত্রুটির মতো একটি ত্রুটি দেখতে tf.debugging.check_numerics , ত্রুটিটি উত্থাপন করার আগে এটি সমস্ত মুলতুবি এবং সারিবদ্ধ ফাংশন বাতিল করবে। তাদের সংশ্লিষ্ট RemoteValue গুলি আনার ফলে একটি CancelledError ত্রুটি দেখা দেবে।

একটি ত্রুটি উত্থাপিত হওয়ার পরে, সমন্বয়কারী একই ত্রুটি বা বাতিল ফাংশন থেকে কোনো ত্রুটি উত্থাপন করবে না।

কর্মক্ষমতা বৃদ্ধি

আপনি ParameterServerStrategy এবং ClusterResolver সাথে প্রশিক্ষণের সময় কর্মক্ষমতা সংক্রান্ত সমস্যাগুলি দেখতে পেলে বেশ কয়েকটি সম্ভাব্য কারণ রয়েছে।

একটি সাধারণ কারণ হল প্যারামিটার সার্ভারের ভারসাম্যহীন লোড রয়েছে এবং কিছু ভারী-লোড করা প্যারামিটার সার্ভার ক্ষমতায় পৌঁছেছে। এছাড়াও একাধিক মূল কারণ থাকতে পারে। এই সমস্যা প্রশমিত করার কিছু সহজ পদ্ধতি হল:

  1. একটি ParameterServerStrategy তৈরি করার সময় একটি variable_partitioner নির্দিষ্ট করার মাধ্যমে আপনার বড় মডেলের ভেরিয়েবলগুলিকে ভাগ করুন।
  2. একটি হটস্পট ভেরিয়েবল তৈরি করা এড়িয়ে চলুন যা সম্ভব হলে একটি একক ধাপে সমস্ত প্যারামিটার সার্ভারের জন্য প্রয়োজনীয়। উদাহরণস্বরূপ, অপ্টিমাইজারগুলিতে একটি ধ্রুবক শেখার হার বা সাবক্লাস tf.keras.optimizers.schedules.LearningRateSchedule ব্যবহার করুন যেহেতু ডিফল্ট আচরণ হল যে শেখার হার একটি নির্দিষ্ট প্যারামিটার সার্ভারে রাখা একটি পরিবর্তনশীল হয়ে যাবে এবং প্রতিটি ধাপে অন্য সমস্ত প্যারামিটার সার্ভার দ্বারা অনুরোধ করা হবে। .
  3. কেরাস প্রিপ্রসেসিং লেয়ারে পাঠানোর আগে আপনার বড় শব্দভান্ডার এলোমেলো করুন।

পারফরম্যান্স সমস্যার আরেকটি সম্ভাব্য কারণ হল সমন্বয়কারী। আপনার schedule / join প্রথম বাস্তবায়ন পাইথন-ভিত্তিক এবং এইভাবে থ্রেডিং ওভারহেড থাকতে পারে। এছাড়াও সমন্বয়কারী এবং কর্মীদের মধ্যে লেটেন্সি বড় হতে পারে। এই যদি হয় তাহলে,

  • Model.fit এর জন্য, আপনি Model.compile এ প্রদত্ত steps_per_execution আর্গুমেন্ট 1-এর চেয়ে বড় মান সেট করতে পারেন।

  • একটি কাস্টম প্রশিক্ষণ লুপের জন্য, আপনি একটি একক tf.function এ একাধিক ধাপ প্যাক করতে পারেন:

steps_per_invocation = 10

@tf.function
def step_fn(iterator):
  for _ in range(steps_per_invocation):
    features, labels = next(iterator)
    def replica_fn(features, labels):
      ...

    strategy.run(replica_fn, args=(features, labels))

যেহেতু লাইব্রেরিটি আরও অপ্টিমাইজ করা হয়েছে, আশা করি বেশিরভাগ ব্যবহারকারীকে ভবিষ্যতে ম্যানুয়ালি পদক্ষেপগুলি প্যাক করতে হবে না৷

উপরন্তু, কর্মক্ষমতা উন্নতির জন্য একটি ছোট কৌশল হল রিটার্ন মান ছাড়াই ফাংশনের সময় নির্ধারণ করা, যেমনটি উপরে হ্যান্ডলিং টাস্ক ব্যর্থতা বিভাগে ব্যাখ্যা করা হয়েছে।

পরিচিত সীমাবদ্ধতা

বেশিরভাগ পরিচিত সীমাবদ্ধতাগুলি ইতিমধ্যেই উপরের বিভাগে কভার করা হয়েছে। এই বিভাগে একটি সারসংক্ষেপ প্রদান করে.

ParameterServerStrategy সাধারণ

  • os.environment["grpc_fail_fast"]="use_caller" সমন্বয়কারী সহ প্রতিটি কাজের জন্য প্রয়োজন, যাতে ত্রুটি সহনশীলতা সঠিকভাবে কাজ করে।
  • সিঙ্ক্রোনাস প্যারামিটার সার্ভার প্রশিক্ষণ সমর্থিত নয়।
  • সর্বোত্তম কর্মক্ষমতা অর্জনের জন্য সাধারণত একটি একক ফাংশনে একাধিক ধাপ প্যাক করা প্রয়োজন।
  • tf.saved_model.load এর মাধ্যমে শার্ড ভেরিয়েবল সহ একটি saved_model লোড করা সমর্থিত নয়। দ্রষ্টব্য TensorFlow সার্ভিং ব্যবহার করে এই ধরনের একটি সংরক্ষিত_মডেল লোড করা কাজ করবে বলে আশা করা হচ্ছে।
  • শার্ডেড অপ্টিমাইজার স্লট ভেরিয়েবল সহ একটি ভিন্ন সংখ্যক শার্ডে চেকপয়েন্ট লোড করা সমর্থিত নয়।
  • সমন্বয়কারীর টাস্ক রিস্টার্ট না করে প্যারামিটার সার্ভারের ব্যর্থতা থেকে পুনরুদ্ধার করা সমর্থিত নয়।
  • tf.lookup.StaticHashTable এর ব্যবহার (যা সাধারণত কিছু কেরাস প্রিপ্রসেসিং লেয়ার দ্বারা নিযুক্ত করা হয়, যেমন tf.keras.layers.IntegerLookup , tf.keras.layers.StringLookup , এবং tf.keras.layers.TextVectorization ) রিসোর্সে স্থাপন করা হয় প্যারামিটার সার্ভার প্রশিক্ষণের সাথে এই সময়ে সমন্বয়কারী। কর্মীদের থেকে সমন্বয়কারী পর্যন্ত RPC-এর সন্ধানের জন্য এর কার্যক্ষমতার প্রভাব রয়েছে। এই ঠিকানা একটি বর্তমান উচ্চ অগ্রাধিকার.

Model.fit সুনির্দিষ্ট

  • steps_per_epochModel.fit আর্গুমেন্ট প্রয়োজন। আপনি একটি মান নির্বাচন করতে পারেন যা একটি যুগে উপযুক্ত ব্যবধান প্রদান করে।
  • পারফরম্যান্সের কারণে ব্যাচ-লেভেল কল আছে এমন কাস্টম কলব্যাকগুলির জন্য ParameterServerStrategy সার্ভার স্ট্র্যাটেজির সমর্থন নেই৷ আপনার সেই কলগুলিকে উপযুক্তভাবে বাছাই steps_per_epoch সহ যুগ-স্তরের কলে রূপান্তর করা উচিত, যাতে সেগুলিকে প্রতিটি steps_per_epoch নম্বর বলা হয়। অন্তর্নির্মিত কলব্যাকগুলি প্রভাবিত হয় না: তাদের ব্যাচ-স্তরের কলগুলি কার্যকরী হওয়ার জন্য সংশোধন করা হয়েছে৷ ParameterServerStrategy জন্য সমর্থনকারী ব্যাচ-স্তরের কলের পরিকল্পনা করা হচ্ছে।
  • একই কারণে, অন্যান্য কৌশলগুলির বিপরীতে, অগ্রগতি বার এবং মেট্রিক্স শুধুমাত্র যুগের সীমানায় লগ করা হয়।
  • run_eagerly সমর্থিত নয়।

কাস্টম প্রশিক্ষণ লুপ সুনির্দিষ্ট