آموزش سرور پارامتر با ParameterServerStrategy

مشاهده در TensorFlow.org در Google Colab اجرا شود مشاهده منبع در GitHub دانلود دفترچه یادداشت

بررسی اجمالی

آموزش سرور پارامتر یک روش متداول داده موازی برای افزایش مقیاس آموزش مدل در چندین ماشین است.

یک خوشه آموزشی سرور پارامتر شامل کارگران و سرورهای پارامتر است. متغیرها بر روی سرورهای پارامتر ایجاد می شوند و در هر مرحله توسط کارگران خوانده و به روز می شوند. به طور پیش فرض، کارگران این متغیرها را به طور مستقل و بدون همگام سازی با یکدیگر می خوانند و به روز می کنند. به همین دلیل است که گاهی اوقات آموزش به سبک سرور پارامتر را آموزش ناهمزمان می نامند.

در TensorFlow 2، آموزش سرور پارامتر توسط کلاس tf.distribute.experimental.ParameterServerStrategy ، که مراحل آموزش را در یک خوشه توزیع می کند که تا هزاران کارگر (همراه با سرورهای پارامتر) مقیاس می شود.

روش های آموزشی پشتیبانی شده

دو روش اصلی آموزشی پشتیبانی شده وجود دارد:

خوشه ای با مشاغل و وظایف

صرف نظر از API انتخابی ( Model.fit یا یک حلقه آموزشی سفارشی)، آموزش توزیع شده در TensorFlow 2 شامل: یک 'cluster' با چندین 'jobs' است و هر یک از کارها ممکن است یک یا چند 'tasks' داشته باشد.

هنگام استفاده از آموزش سرور پارامتر، توصیه می شود:

  • یک شغل هماهنگ کننده (که دارای نام شغلی chief است)
  • چندین شغل کارگری (نام شغل worker )؛ و
  • کارهای سرور چند پارامتری (نام شغل ps )

در حالی که هماهنگ کننده منابع ایجاد می کند، وظایف آموزشی را ارسال می کند، نقاط بازرسی می نویسد، و با خرابی کارها سروکار دارد، کارگران و سرورهای پارامتر tf.distribute.Server را اجرا می کنند که به درخواست های هماهنگ کننده گوش می دهند.

آموزش سرور پارامتر با Model.fit API

آموزش سرور پارامتر با Model.fit API به هماهنگ کننده نیاز دارد که از یک شی tf.distribute.experimental.ParameterServerStrategy و یک tf.keras.utils.experimental.DatasetCreator به عنوان ورودی استفاده کند. مشابه استفاده از Model.fit بدون استراتژی، یا با سایر استراتژی‌ها، گردش کار شامل ایجاد و کامپایل مدل، آماده‌سازی تماس‌های برگشتی و به دنبال آن Model.fit است.

آموزش سرور پارامتر با حلقه آموزشی سفارشی

با حلقه های آموزشی سفارشی، کلاس tf.distribute.experimental.coordinator.ClusterCoordinator جزء کلیدی مورد استفاده برای هماهنگ کننده است.

مهم ترین API ارائه شده توسط شی ClusterCoordinator schedule است:

  • API schedule زمانی یک tf.function را در صف قرار می دهد و بلافاصله یک RemoteValue شبیه به آینده را برمی گرداند.
  • توابع در صف به کارگران راه دور در رشته های پس زمینه فرستاده می شوند و RemoteValue آنها به صورت ناهمزمان پر می شود.
  • از آنجایی که schedule زمان بندی نیازی به تخصیص کارگر ندارد، تابع tf.function منتقل شده می تواند بر روی هر کارگر موجود اجرا شود.
  • اگر کارگری که روی آن اجرا می‌شود قبل از تکمیل آن در دسترس نباشد، این تابع روی کارگر موجود دیگری دوباره امتحان می‌شود.
  • به دلیل این واقعیت و این واقعیت که اجرای تابع اتمی نیست، یک تابع ممکن است بیش از یک بار اجرا شود.

علاوه بر ارسال توابع راه دور، ClusterCoordinator همچنین به ایجاد مجموعه داده‌ها بر روی همه کارگران و بازسازی این مجموعه داده‌ها در زمانی که یک کارگر از شکست بهبود می‌یابد، کمک می‌کند.

راه اندازی آموزش

این آموزش به Model.fit و مسیرهای حلقه آموزشی سفارشی منشعب می شود و شما می توانید متناسب با نیاز خود یکی را انتخاب کنید. بخش هایی غیر از "آموزش با X" برای هر دو مسیر قابل اجرا هستند.

pip install portpicker

راه اندازی خوشه

همانطور که در بالا ذکر شد، یک کلاستر آموزشی سرور پارامتر به یک کار هماهنگ کننده نیاز دارد که برنامه آموزشی شما را اجرا می کند، یک یا چند کارگر و وظایف سرور پارامتر که سرورهای tf.distribute.Server - tf.distribute.Server - و احتمالاً یک کار ارزیابی اضافی که ارزیابی ماشین جانبی را اجرا می کند، نیاز دارد. (به بخش ارزیابی خودروی جانبی در زیر مراجعه کنید). الزامات تنظیم آنها عبارتند از:

  • وظیفه هماهنگ کننده باید آدرس ها و پورت های سایر سرورهای TensorFlow را به جز ارزیاب بداند.
  • کارگران و سرورهای پارامتر باید بدانند که به کدام پورت باید گوش دهند. برای سادگی، معمولاً هنگام ایجاد سرورهای TensorFlow در این وظایف، می‌توانید اطلاعات خوشه‌ای کامل را وارد کنید.
  • وظیفه ارزیاب نیازی به دانستن تنظیمات خوشه آموزشی ندارد. اگر این کار را کرد، نباید سعی کند به کلاستر آموزشی متصل شود.
  • کارگران و سرورهای پارامتر باید به ترتیب دارای انواع وظایف به عنوان "worker" و "ps" باشند. هماهنگ کننده باید به دلایل قدیمی از "chief" به عنوان نوع کار استفاده کند.

در این آموزش شما یک کلاستر در فرآیند ایجاد می کنید تا کل آموزش سرور پارامتر در Colab اجرا شود. نحوه راه اندازی کلاسترهای واقعی را در بخش بعدی یاد خواهید گرفت.

خوشه در حال فرآیند

شما با ایجاد چندین سرور TensorFlow از قبل شروع کرده و بعداً به آنها متصل خواهید شد. توجه داشته باشید که این فقط برای نمایش این آموزش است و در آموزش واقعی سرورها بر روی ماشین های "worker" و "ps" راه اندازی می شوند.

def create_in_process_cluster(num_workers, num_ps):
  """Creates and starts local servers and returns the cluster_resolver."""
  worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
  ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]

  cluster_dict = {}
  cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
  if num_ps > 0:
    cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]

  cluster_spec = tf.train.ClusterSpec(cluster_dict)

  # Workers need some inter_ops threads to work properly.
  worker_config = tf.compat.v1.ConfigProto()
  if multiprocessing.cpu_count() < num_workers + 1:
    worker_config.inter_op_parallelism_threads = num_workers + 1

  for i in range(num_workers):
    tf.distribute.Server(
        cluster_spec,
        job_name="worker",
        task_index=i,
        config=worker_config,
        protocol="grpc")

  for i in range(num_ps):
    tf.distribute.Server(
        cluster_spec,
        job_name="ps",
        task_index=i,
        protocol="grpc")

  cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
      cluster_spec, rpc_layer="grpc")
  return cluster_resolver

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)

تنظیم خوشه در فرآیند اغلب در آزمایش واحد استفاده می شود، مانند اینجا .

یکی دیگر از گزینه‌های آزمایش محلی، راه‌اندازی فرآیندها در ماشین محلی است—برای نمونه‌ای از این رویکرد ، آموزش Multi-worker با Keras را بررسی کنید.

یک ParameterServerStrategy را نمونه سازی کنید

قبل از اینکه وارد کد آموزشی شوید، بیایید یک شی ParameterServerStrategy را نمونه سازی کنیم. توجه داشته باشید که این بدون در نظر گرفتن اینکه آیا شما با Model.fit یا یک حلقه آموزشی سفارشی ادامه می دهید، مورد نیاز است. آرگومان variable_partitioner در بخش Variable Sharding توضیح داده خواهد شد.

variable_partitioner = (
    tf.distribute.experimental.partitioners.MinSizePartitioner(
        min_shard_bytes=(256 << 10),
        max_shards=NUM_PS))

strategy = tf.distribute.experimental.ParameterServerStrategy(
    cluster_resolver,
    variable_partitioner=variable_partitioner)
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'ps': ['localhost:19069', 'localhost:19342'], 'worker': ['localhost:18007', 'localhost:20252', 'localhost:23613']})
INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'ps': ['localhost:19069', 'localhost:19342'], 'worker': ['localhost:18007', 'localhost:20252', 'localhost:23613']})
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:chief/replica:0/task:0/device:GPU:0'], variable_device = '/job:chief/replica:0/task:0/device:GPU:0'
INFO:tensorflow:Number of GPUs on workers: 1

به منظور استفاده از GPU برای آموزش، GPUهای قابل مشاهده برای هر کارگر را اختصاص دهید. ParameterServerStrategy از تمام GPU های موجود در هر کارگر استفاده می کند، با این محدودیت که همه کارگران باید به همان تعداد GPU در دسترس باشند.

اشتراک گذاری متغیر

تقسیم بندی متغیر به تقسیم یک متغیر به چندین متغیر کوچکتر اشاره دارد که به آن هارد می گویند . اشتراک گذاری متغیر ممکن است برای توزیع بار شبکه هنگام دسترسی به این خرده ها مفید باشد. همچنین توزیع محاسبات و ذخیره سازی یک متغیر عادی در سرورهای چند پارامتری مفید است.

برای فعال کردن اشتراک گذاری variable_partitioner ، می توانید هنگام ساخت یک شی ParameterServerStrategy ، یک متغیر_partitioner را ارسال کنید. متغیر_partitioner هر بار که یک variable_partitioner ایجاد می‌شود فراخوانی می‌شود و انتظار می‌رود که تعداد خرده‌ها را در هر بعد از متغیر برگرداند. برخی از variable_partitioner partitioner خارج از جعبه مانند tf.distribute.experimental.partitioners.MinSizePartitioner ارائه شده است. توصیه می شود از پارتیشن های مبتنی بر اندازه مانند tf.distribute.experimental.partitioners.MinSizePartitioner برای جلوگیری از پارتیشن بندی متغیرهای کوچک استفاده کنید که می تواند تأثیر منفی بر سرعت آموزش مدل داشته باشد.

هنگامی که یک variable_partitioner ارسال می‌شود و اگر یک متغیر را مستقیماً تحت strategy.scope() ایجاد کنید، به یک نوع ظرف با ویژگی variables تبدیل می‌شود که دسترسی به لیست خرده‌ها را فراهم می‌کند. در بیشتر موارد، این ظرف به طور خودکار با به هم پیوستن تمام خرده ها به یک Tensor تبدیل می شود. در نتیجه می توان از آن به عنوان یک متغیر عادی استفاده کرد. از سوی دیگر، برخی از روش‌های TensorFlow مانند tf.nn.embedding_lookup پیاده‌سازی کارآمدی را برای این نوع کانتینر فراهم می‌کنند و در این روش‌ها از الحاق خودکار اجتناب می‌شود.

لطفاً برای جزئیات بیشتر به اسناد API tf.distribute.experimental.ParameterServerStrategy مراجعه کنید.

آموزش با Model.fit

Keras یک API آموزشی با کاربرد آسان را از طریق Model.fit که حلقه آموزشی زیر کاپوت را کنترل می‌کند، با انعطاف‌پذیری train_step ، و تماس‌های برگشتی، که قابلیت‌هایی مانند ذخیره نقطه بازرسی یا ذخیره خلاصه برای TensorBoard را فراهم می‌کند. با Model.fit ، می توان از همان کد آموزشی برای استراتژی های دیگر با تعویض ساده شی استراتژی استفاده کرد.

داده های ورودی

Model.fit با آموزش سرور پارامتر مستلزم آن است که داده های ورودی در یک فراخوانی ارائه شود که یک آرگومان منفرد از نوع tf.distribute.InputContext را می گیرد و یک tf.data.Dataset را برمی گرداند. سپس، یک شی tf.keras.utils.experimental.DatasetCreator که چنین callable فراخوانی را می گیرد و یک شی اختیاری tf.distribute.InputOptions را از طریق آرگومان input_options کنید.

توجه داشته باشید که توصیه می‌شود داده‌ها را با آموزش سرور پارامتر به هم بزنید و تکرار کنید و در فراخوانی fit ، steps_per_epoch را مشخص کنید تا کتابخانه مرزهای دوره را بشناسد.

لطفاً برای اطلاعات بیشتر در مورد آرگومان InputContext به آموزش ورودی توزیع شده مراجعه کنید.

def dataset_fn(input_context):
  global_batch_size = 64
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)

  x = tf.random.uniform((10, 10))
  y = tf.random.uniform((10,))

  dataset = tf.data.Dataset.from_tensor_slices((x, y)).shuffle(10).repeat()
  dataset = dataset.shard(
      input_context.num_input_pipelines,
      input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2)

  return dataset

dc = tf.keras.utils.experimental.DatasetCreator(dataset_fn)

کد موجود در dataset_fn در دستگاه ورودی که معمولاً CPU است، در هر یک از ماشین‌های کارگر فراخوانی می‌شود.

ساخت و تدوین مدل

اکنون، یک tf.keras.Model -یک مدل tf.keras.models.Sequential بی‌اهمیت برای اهداف نمایشی ایجاد می‌کنید که با یک Model.compile برای ترکیب اجزایی مانند بهینه‌ساز، معیارها، یا پارامترهایی مانند steps_per_execution :

with strategy.scope():
  model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)])

  model.compile(tf.keras.optimizers.SGD(), loss='mse', steps_per_execution=10)

پاسخ به تماس و آموزش

قبل از اینکه model.fit را برای آموزش واقعی فراخوانی کنید، بیایید فراخوان های مورد نیاز را برای کارهای رایج آماده کنیم، مانند:

  • ModelCheckpoint : برای ذخیره وزن مدل.
  • BackupAndRestore : برای اطمینان از اینکه از پیشرفت آموزش به طور خودکار نسخه پشتیبان تهیه می شود و در صورت عدم دسترسی به خوشه (مانند سقط یا preemption) بازیابی می شود. یا
  • TensorBoard : برای ذخیره گزارش های پیشرفت در فایل های خلاصه، که در ابزار TensorBoard تجسم می شوند.
working_dir = '/tmp/my_working_dir'
log_dir = os.path.join(working_dir, 'log')
ckpt_filepath = os.path.join(working_dir, 'ckpt')
backup_dir = os.path.join(working_dir, 'backup')

callbacks = [
    tf.keras.callbacks.TensorBoard(log_dir=log_dir),
    tf.keras.callbacks.ModelCheckpoint(filepath=ckpt_filepath),
    tf.keras.callbacks.BackupAndRestore(backup_dir=backup_dir),
]

model.fit(dc, epochs=5, steps_per_epoch=20, callbacks=callbacks)
Epoch 1/5
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
2022-01-26 05:32:01.399347: W tensorflow/python/util/util.cc:368] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 4s - loss: 0.5761 - 4s/epoch - 185ms/step
Epoch 2/5
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 1s - loss: 0.4423 - 561ms/epoch - 28ms/step
Epoch 3/5
WARNING:tensorflow:5 out of the last 5 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f89783e7560> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
WARNING:tensorflow:6 out of the last 6 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f897851f050> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
20/20 - 0s - loss: 0.3983 - 392ms/epoch - 20ms/step
Epoch 4/5
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 1s - loss: 0.3592 - 507ms/epoch - 25ms/step
Epoch 5/5
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 0s - loss: 0.2705 - 384ms/epoch - 19ms/step
<keras.callbacks.History at 0x7f89984ca890>

استفاده مستقیم با ClusterCoordinator (اختیاری)

حتی اگر مسیر آموزشی Model.fit را انتخاب کنید، می‌توانید به صورت اختیاری یک شی tf.distribute.experimental.coordinator.ClusterCoordinator را برای برنامه‌ریزی سایر توابع که می‌خواهید روی کارگران اجرا شوند، نمونه‌سازی کنید. برای جزئیات و مثال های بیشتر به بخش آموزش با حلقه آموزش سفارشی مراجعه کنید.

آموزش با یک حلقه آموزشی سفارشی

استفاده از حلقه های آموزشی سفارشی با tf.distribute.Strategy انعطاف پذیری زیادی را برای تعریف حلقه های آموزشی فراهم می کند. با ParameterServerStrategy که در بالا تعریف شده است (به عنوان strategy )، از tf.distribute.experimental.coordinator.ClusterCoordinator برای ارسال اجرای مراحل آموزشی به کارگران راه دور استفاده خواهید کرد.

سپس، همانطور که در حلقه آموزشی با سایر tf.distribute.Strategy انجام داده اید، یک مدل ایجاد می کنید، یک مجموعه داده و یک تابع مرحله تعریف می کنید. جزئیات بیشتر را می توانید در آموزش سفارشی با آموزش tf.distribute.Strategy بیابید .

برای اطمینان از واکشی کارآمد مجموعه داده، از APIهای ایجاد مجموعه داده توزیع شده توصیه شده که در مراحل آموزش اعزام به کارگران راه دور در زیر ذکر شده است استفاده کنید. همچنین، حتماً با Strategy.run در داخل worker_fn تماس بگیرید تا از مزایای کامل پردازنده‌های گرافیکی اختصاص داده شده به کارگران استفاده کنید. بقیه مراحل برای آموزش با یا بدون GPU یکسان است.

بیایید این اجزا را در مراحل زیر ایجاد کنیم:

داده ها را تنظیم کنید

ابتدا، تابعی بنویسید که مجموعه داده ای را ایجاد می کند که شامل منطق پیش پردازش پیاده سازی شده توسط لایه های پیش پردازش Keras است .

شما این لایه‌ها را خارج از dataset_fn ایجاد می‌کنید، اما تبدیل را در داخل مجموعه dataset_fn اعمال می‌کنید، زیرا مجموعه dataset_fn را در یک tf.function قرار می‌دهید، که اجازه نمی‌دهد متغیرها در داخل آن ایجاد شوند.

feature_vocab = [
    "avenger", "ironman", "batman", "hulk", "spiderman", "kingkong", "wonder_woman"
]
label_vocab = ["yes", "no"]

with strategy.scope():
  feature_lookup_layer = tf.keras.layers.StringLookup(
      vocabulary=feature_vocab,
      mask_token=None)
  label_lookup_layer = tf.keras.layers.StringLookup(
      vocabulary=label_vocab,
      num_oov_indices=0,
      mask_token=None)

  raw_feature_input = tf.keras.layers.Input(
      shape=(3,),
      dtype=tf.string,
      name="feature")
  feature_id_input = feature_lookup_layer(raw_feature_input)
  feature_preprocess_stage = tf.keras.Model(
      {"features": raw_feature_input},
      feature_id_input)

  raw_label_input = tf.keras.layers.Input(
      shape=(1,),
      dtype=tf.string,
      name="label")
  label_id_input = label_lookup_layer(raw_label_input)

  label_preprocess_stage = tf.keras.Model(
      {"label": raw_label_input},
      label_id_input)
/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/numpy/core/numeric.py:2446: FutureWarning: elementwise comparison failed; returning scalar instead, but in the future will perform elementwise comparison
  return bool(asarray(a1 == a2).all())

نمونه های اسباب بازی را در یک مجموعه داده ایجاد کنید:

def feature_and_label_gen(num_examples=200):
  examples = {"features": [], "label": []}
  for _ in range(num_examples):
    features = random.sample(feature_vocab, 3)
    label = ["yes"] if "avenger" in features else ["no"]
    examples["features"].append(features)
    examples["label"].append(label)
  return examples

examples = feature_and_label_gen()

سپس، مجموعه داده آموزشی پیچیده شده در یک dataset_fn را ایجاد کنید:

def dataset_fn(_):
  raw_dataset = tf.data.Dataset.from_tensor_slices(examples)

  train_dataset = raw_dataset.map(
      lambda x: (
          {"features": feature_preprocess_stage(x["features"])},
          label_preprocess_stage(x["label"])
      )).shuffle(200).batch(32).repeat()
  return train_dataset

مدل را بسازید

بعد، مدل و سایر اشیاء را ایجاد کنید. مطمئن شوید که همه متغیرها را تحت strategy.scope ایجاد کرده اید.

# These variables created under the `strategy.scope` will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
  # Create the model. The input needs to be compatible with Keras processing layers.
  model_input = tf.keras.layers.Input(
      shape=(3,), dtype=tf.int64, name="model_input")

  emb_layer = tf.keras.layers.Embedding(
      input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=16384)
  emb_output = tf.reduce_mean(emb_layer(model_input), axis=1)
  dense_output = tf.keras.layers.Dense(units=1, activation="sigmoid")(emb_output)
  model = tf.keras.Model({"features": model_input}, dense_output)

  optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.1)
  accuracy = tf.keras.metrics.Accuracy()

اجازه دهید تأیید کنیم که استفاده از FixedShardsPartitioner همه متغیرها را به دو قطعه تقسیم می‌کند و هر قطعه به سرورهای پارامتر متفاوتی اختصاص می‌یابد:

assert len(emb_layer.weights) == 2
assert emb_layer.weights[0].shape == (4, 16384)
assert emb_layer.weights[1].shape == (4, 16384)
assert emb_layer.weights[0].device == "/job:ps/replica:0/task:0/device:CPU:0"
assert emb_layer.weights[1].device == "/job:ps/replica:0/task:1/device:CPU:0"

مرحله آموزش را تعریف کنید

سوم، مرحله آموزشی را در یک tf.function :

@tf.function
def step_fn(iterator):

  def replica_fn(batch_data, labels):
    with tf.GradientTape() as tape:
      pred = model(batch_data, training=True)
      per_example_loss = tf.keras.losses.BinaryCrossentropy(
              reduction=tf.keras.losses.Reduction.NONE)(labels, pred)
      loss = tf.nn.compute_average_loss(per_example_loss)
      gradients = tape.gradient(loss, model.trainable_variables)

    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    accuracy.update_state(labels, actual_pred)
    return loss

  batch_data, labels = next(iterator)
  losses = strategy.run(replica_fn, args=(batch_data, labels))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)

در تابع مرحله آموزشی بالا، فراخوانی Strategy.run و Strategy.reduce در step_fn می‌تواند چندین GPU را برای هر کارگر پشتیبانی کند. اگر به کارگران GPU تخصیص داده شده باشد، Strategy.run مجموعه داده ها را بر روی چندین نسخه توزیع می کند.

اعزام مراحل آموزشی به کارگران از راه دور

پس از اینکه همه محاسبات توسط ParameterServerStrategy تعریف شدند، از کلاس tf.distribute.experimental.coordinator.ClusterCoordinator برای ایجاد منابع و توزیع مراحل آموزشی برای کارگران راه دور استفاده خواهید کرد.

بیایید ابتدا یک شی ClusterCoordinator ایجاد کنیم و شی استراتژی را وارد کنیم:

coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)

سپس، یک مجموعه داده برای هر کارگر و یک تکرار کننده ایجاد کنید. در per_worker_dataset_fn زیر، قرار دادن مجموعه dataset_fn در strategy.distribute_datasets_from_function توصیه می‌شود تا امکان واکشی اولیه کارآمد به GPUها را فراهم کند.

@tf.function
def per_worker_dataset_fn():
  return strategy.distribute_datasets_from_function(dataset_fn)

per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).

مرحله آخر توزیع محاسبات بین کارگران راه دور با استفاده از ClusterCoordinator.schedule :

  • متد schedule یک tf.function را در صف قرار می دهد و بلافاصله یک RemoteValue شبیه آینده را برمی گرداند. توابع صف به کارمندان راه دور در رشته های پس زمینه فرستاده می شوند و RemoteValue به صورت ناهمزمان پر می شود.
  • از متد join ( ClusterCoordinator.join ) می توان برای صبر کردن تا اجرای تمام توابع برنامه ریزی شده استفاده کرد.
num_epoches = 4
steps_per_epoch = 5
for i in range(num_epoches):
  accuracy.reset_states()
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  # Wait at epoch boundaries.
  coordinator.join()
  print ("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
Finished epoch 0, accuracy is 0.543750.
Finished epoch 1, accuracy is 0.543750.
Finished epoch 2, accuracy is 0.950000.
Finished epoch 3, accuracy is 1.000000.

در اینجا نحوه واکشی نتیجه RemoteValue :

loss = coordinator.schedule(step_fn, args=(per_worker_iterator,))
print ("Final loss is %f" % loss.fetch())
Final loss is 0.000000

از طرف دیگر، می توانید تمام مراحل را راه اندازی کنید و در حالی که منتظر تکمیل هستید، کاری انجام دهید:

for _ in range(total_steps):
  coordinator.schedule(step_fn, args=(per_worker_iterator,))
while not coordinator.done():
  time.sleep(10)
  # Do something like logging metrics or writing checkpoints.

برای آموزش کامل و گردش کار برای این مثال خاص، لطفاً این تست را بررسی کنید.

اطلاعات بیشتر در مورد ایجاد مجموعه داده

مجموعه داده در کد بالا با استفاده از ClusterCoordinator.create_per_worker_dataset API ایجاد شده است. برای هر کارگر یک مجموعه داده ایجاد می کند و یک شی ظرف را برمی گرداند. می‌توانید متد iter را برای ایجاد یک تکرارکننده برای هر کارگر فراخوانی کنید. تکرارکننده هر کارگر شامل یک تکرارکننده به ازای هر کارگر است و قطعه مربوط به یک کارگر در آرگومان ورودی تابع که قبل از اجرای تابع بر روی یک کارگر خاص به متد ClusterCoordinator.schedule ، جایگزین می‌شود.

در حال حاضر، روش ClusterCoordinator.schedule فرض می‌کند که کارگران معادل هستند و بنابراین فرض می‌کند که مجموعه داده‌ها روی کارگران مختلف یکسان هستند، با این تفاوت که اگر حاوی یک عملیات Dataset.shuffle باشند، ممکن است به‌طور متفاوتی با هم ترکیب شوند. به همین دلیل، همچنین توصیه می شود که مجموعه داده ها به طور نامحدود تکرار شوند و به جای تکیه بر OutOfRangeError از یک مجموعه داده، تعداد محدودی از مراحل را برنامه ریزی کنید.

نکته مهم دیگر این است که مجموعه داده‌های tf.data از سریال‌سازی و سریال‌زدایی ضمنی در سراسر مرزهای وظیفه پشتیبانی نمی‌کنند. بنابراین مهم است که کل مجموعه داده را در داخل تابع ارسال شده به ClusterCoordinator.create_per_worker_dataset کنید.

ارزیابی

بیش از یک راه برای تعریف و اجرای یک حلقه ارزیابی در آموزش توزیع شده وجود دارد. هر کدام مزایا و معایب خاص خود را دارند که در زیر توضیح داده شده است. اگر ترجیحی ندارید، روش ارزیابی درون خطی توصیه می شود.

ارزیابی درون خطی

در این روش هماهنگ کننده بین آموزش و ارزشیابی متناوب می شود و از این رو به آن ارزشیابی درون خطی می گویند.

چندین مزیت ارزیابی درون خطی وجود دارد. مثلا:

  • می‌تواند از مدل‌های ارزیابی بزرگ و مجموعه داده‌های ارزیابی که یک وظیفه نمی‌تواند انجام دهد، پشتیبانی کند.
  • از نتایج ارزیابی می توان برای تصمیم گیری برای آموزش دوره بعدی استفاده کرد.

دو روش برای اجرای ارزیابی درون خطی وجود دارد: ارزیابی مستقیم و ارزیابی توزیع شده.

  • ارزیابی مستقیم : برای مدل‌های کوچک و مجموعه داده‌های ارزیابی، هماهنگ‌کننده می‌تواند ارزیابی را مستقیماً روی مدل توزیع‌شده با مجموعه داده ارزیابی روی هماهنگ‌کننده اجرا کند:
eval_dataset = tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).batch(8)

eval_accuracy = tf.keras.metrics.Accuracy()

for batch_data, labels in eval_dataset:
  pred = model(batch_data, training=False)
  actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
  eval_accuracy.update_state(labels, actual_pred)

print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000
  • ارزیابی توزیع‌شده : برای مدل‌ها یا مجموعه‌های داده بزرگ که اجرای مستقیم روی هماهنگ‌کننده غیرممکن است، وظیفه هماهنگ‌کننده می‌تواند وظایف ارزیابی را از طریق روش‌های ClusterCoordinator.schedule / ClusterCoordinator.join بین کارگران توزیع کند:
with strategy.scope():
  # Define the eval metric on parameter servers.
  eval_accuracy = tf.keras.metrics.Accuracy()

@tf.function
def eval_step(iterator):
  def replica_fn(batch_data, labels):
    pred = model(batch_data, training=False)
    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    eval_accuracy.update_state(labels, actual_pred)
  batch_data, labels = next(iterator)
  strategy.run(replica_fn, args=(batch_data, labels))

def eval_dataset_fn():
  return tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).shuffle(16).repeat().batch(8)

per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn)
per_worker_eval_iterator = iter(per_worker_eval_dataset)

eval_steps_per_epoch = 2
for _ in range(eval_steps_per_epoch):
  coordinator.schedule(eval_step, args=(per_worker_eval_iterator,))
coordinator.join()
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources
WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources
WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources
Evaluation accuracy: 1.000000

ارزیابی خودروی جانبی

روش دیگری به نام ارزیابی خودروی جانبی است که در آن یک کار ارزیاب اختصاصی ایجاد می‌کنید که به طور مکرر نقاط بازرسی را می‌خواند و ارزیابی را در آخرین ایست بازرسی انجام می‌دهد. اگر نیازی به تغییر حلقه آموزشی خود بر اساس نتایج ارزیابی ندارید، این امکان را به برنامه آموزشی شما می دهد تا زودتر تمام شود. با این حال، برای شروع ارزیابی به یک کار ارزیاب اضافی و بازرسی دوره ای نیاز دارد. در زیر یک حلقه ارزیابی احتمالی خودروی جانبی وجود دارد:

checkpoint_dir = ...
eval_model = ...
eval_data = ...
checkpoint = tf.train.Checkpoint(model=eval_model)

for latest_checkpoint in tf.train.checkpoints_iterator(
    checkpoint_dir):
  try:
    checkpoint.restore(latest_checkpoint).expect_partial()
  except (tf.errors.OpError,) as e:
    # checkpoint may be deleted by training when it is about to read it.
    continue

  # Optionally add callbacks to write summaries.
  eval_model.evaluate(eval_data)

  # Evaluation finishes when it has evaluated the last epoch.
  if latest_checkpoint.endswith('-{}'.format(train_epoches)):
    break

خوشه ها در دنیای واقعی

در یک محیط تولید واقعی، شما تمامی وظایف را در فرآیندهای مختلف بر روی ماشین های مختلف اجرا خواهید کرد. ساده ترین راه برای پیکربندی اطلاعات کلاستر در هر کار، تنظیم متغیرهای محیطی "TF_CONFIG" و استفاده از tf.distribute.cluster_resolver.TFConfigClusterResolver برای تجزیه "TF_CONFIG" است.

برای توضیحات کلی در مورد متغیرهای محیطی "TF_CONFIG" ، به راهنمای آموزشی Distributed مراجعه کنید.

اگر کارهای آموزشی خود را با استفاده از Kubernetes یا سایر قالب های پیکربندی شروع کنید، به احتمال زیاد این الگوها قبلاً “TF_CONFIG" را برای شما تنظیم کرده اند.

متغیر محیطی "TF_CONFIG" را تنظیم کنید

فرض کنید 3 کارگر و 2 سرور پارامتر دارید، "TF_CONFIG" کارگر 1 می تواند باشد:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"],
        "chief": ["host6:port"]
    },
    "task": {"type": "worker", "index": 1}
})

"TF_CONFIG" ارزیاب می تواند:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "evaluator": ["host7:port"]
    },
    "task": {"type": "evaluator", "index": 0}
})

بخش "cluster" در رشته "TF_CONFIG" بالا برای ارزیاب اختیاری است.

اگر از یک باینری برای همه کارها استفاده کنید

اگر ترجیح می دهید همه این وظایف را با استفاده از یک باینری واحد اجرا کنید، باید در همان ابتدا اجازه دهید برنامه خود به نقش های مختلف منشعب شود:

cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if cluster_resolver.task_type in ("worker", "ps"):
  # Start a TensorFlow server and wait.
elif cluster_resolver.task_type == "evaluator":
  # Run side-car evaluation
else:
  # Run the coordinator.

کد زیر یک سرور TensorFlow را راه اندازی می کند و منتظر می ماند:

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

server = tf.distribute.Server(
    cluster_resolver.cluster_spec(),
    job_name=cluster_resolver.task_type,
    task_index=cluster_resolver.task_id,
    protocol=cluster_resolver.rpc_layer or "grpc",
    start=True)
server.join()

رسیدگی به شکست کار

شکست کارگر

tf.distribute.experimental.coordinator.ClusterCoordinator یا Model.fit تحمل خطای داخلی را برای خرابی کارگر فراهم می کند. پس از بازیابی کارگر، تابع مجموعه داده‌ای که قبلا ارائه شده بود (یا به ClusterCoordinator.create_per_worker_dataset برای یک حلقه آموزشی سفارشی، یا tf.keras.utils.experimental.DatasetCreator برای Model.fit ) روی کارگران برای ایجاد مجدد مجموعه‌های داده فراخوانی می‌شود.

سرور پارامتر یا هماهنگ کننده خراب است

با این حال، هنگامی که هماهنگ کننده یک خطای سرور پارامتر را می بیند، بلافاصله یک UnavailableError یا AbortedError را ایجاد می کند. در این مورد می توانید هماهنگ کننده را مجددا راه اندازی کنید. خود هماهنگ کننده نیز ممکن است در دسترس نباشد. بنابراین، ابزار خاصی برای از دست ندادن پیشرفت آموزش توصیه می شود:

  • برای Model.fit ، باید از BackupAndRestore callback استفاده کنید، که ذخیره و بازیابی پیشرفت را به طور خودکار مدیریت می کند. برای مثال به بخش پاسخ به تماس و آموزش در بالا مراجعه کنید.

  • برای یک حلقه آموزشی سفارشی، باید متغیرهای مدل را به صورت دوره ای بررسی کنید و متغیرهای مدل را در صورت وجود، قبل از شروع آموزش، از یک نقطه بازرسی بارگیری کنید. اگر یک بهینه‌ساز چک پوینت باشد، می‌توان پیشرفت آموزش را تقریباً از optimizer.iterations استنباط کرد:

checkpoint_manager = tf.train.CheckpointManager(
    tf.train.Checkpoint(model=model, optimizer=optimizer),
    checkpoint_dir,
    max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
  checkpoint = checkpoint_manager.checkpoint
  checkpoint.restore(
      checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()

global_steps = int(optimizer.iterations.numpy())
starting_epoch = global_steps // steps_per_epoch

for _ in range(starting_epoch, num_epoches):
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  coordinator.join()
  checkpoint_manager.save()

واکشی RemoteValue

اگر یک تابع با موفقیت اجرا شود، واکشی RemoteValue تضمین می شود که موفقیت آمیز باشد. این به این دلیل است که در حال حاضر مقدار بازگشتی بلافاصله پس از اجرای یک تابع به هماهنگ کننده کپی می شود. اگر در حین کپی هر گونه خرابی کارگر وجود داشته باشد، این تابع روی کارگر موجود دیگری امتحان می شود. بنابراین، اگر می‌خواهید برای عملکرد بهینه‌سازی کنید، می‌توانید توابع را بدون مقدار بازگشتی زمان‌بندی کنید.

گزارش خطا

هنگامی که هماهنگ کننده خطایی مانند UnavailableError از سرورهای پارامتر یا سایر خطاهای برنامه مانند InvalidArgument از tf.debugging.check_numerics را ببیند، قبل از افزایش خطا، همه توابع در انتظار و صف را لغو می کند. واکشی RemoteValue مربوط به آنها یک CancelledError ایجاد می کند.

پس از مطرح شدن یک خطا، هماهنگ کننده همان خطا یا خطای توابع لغو شده را مطرح نخواهد کرد.

بهبود عملکرد

اگر هنگام تمرین با ParameterServerStrategy و ClusterResolver ، مشکلات عملکردی را مشاهده کردید، چندین دلیل ممکن وجود دارد.

یکی از دلایل رایج این است که سرورهای پارامتر دارای بار نامتعادل هستند و برخی از سرورهای پارامتر با بارگذاری سنگین به ظرفیت رسیده اند. همچنین می تواند دلایل ریشه ای متعددی داشته باشد. چند روش ساده برای کاهش این مشکل عبارتند از:

  1. هنگام ساخت یک ParameterServerStrategy ، متغیرهای مدل بزرگ خود را از طریق تعیین یک variable_partitioner تقسیم کنید.
  2. در صورت امکان از ایجاد یک متغیر hotspot که توسط همه سرورهای پارامتر مورد نیاز است در یک مرحله خودداری کنید. به عنوان مثال، از یک نرخ یادگیری ثابت یا زیر کلاس tf.keras.optimizers.schedules.LearningRateSchedule در بهینه سازها استفاده کنید زیرا رفتار پیش فرض این است که نرخ یادگیری به متغیری تبدیل می شود که روی یک سرور پارامتر خاص قرار می گیرد و توسط تمام سرورهای پارامتر دیگر در هر مرحله درخواست می شود. .
  3. واژگان بزرگ خود را قبل از ارسال آنها به لایه های پیش پردازش Keras با هم مخلوط کنید.

یکی دیگر از دلایل احتمالی مشکلات عملکرد، هماهنگ کننده است. اولین پیاده‌سازی schedule / join شما مبتنی بر پایتون است و بنابراین ممکن است دارای سربار نخ باشد. همچنین تأخیر بین هماهنگ کننده و کارگران می تواند زیاد باشد. اگر این مورد است،

  • برای Model.fit ، می توانید آرگومان steps_per_execution ارائه شده در Model.compile را روی مقداری بزرگتر از 1 تنظیم کنید.

  • برای یک حلقه آموزشی سفارشی، می توانید چندین مرحله را در یک tf.function :

steps_per_invocation = 10

@tf.function
def step_fn(iterator):
  for _ in range(steps_per_invocation):
    features, labels = next(iterator)
    def replica_fn(features, labels):
      ...

    strategy.run(replica_fn, args=(features, labels))

از آنجایی که کتابخانه بیشتر بهینه شده است، امیدواریم اکثر کاربران مجبور نباشند در آینده مراحل را به صورت دستی بسته بندی کنند.

علاوه بر این، یک ترفند کوچک برای بهبود عملکرد، برنامه‌ریزی عملکردها بدون مقدار بازگشتی است، همانطور که در بخش شکست کار مدیریت در بالا توضیح داده شد.

محدودیت های شناخته شده

بیشتر محدودیت های شناخته شده قبلاً در بخش های بالا پوشش داده شده است. این بخش خلاصه ای را ارائه می دهد.

ParameterServerStrategy کلی

  • os.environment["grpc_fail_fast"]="use_caller" برای هر کار از جمله هماهنگ کننده مورد نیاز است تا تحمل خطا به درستی کار کند.
  • آموزش سرور پارامتر همزمان پشتیبانی نمی شود.
  • معمولاً برای دستیابی به عملکرد بهینه لازم است چندین مرحله در یک عملکرد واحد جمع آوری شود.
  • بارگذاری saved_model از طریق tf.saved_model.load حاوی متغیرهای خرد شده پشتیبانی نمی شود. توجه داشته باشید که بارگیری چنین saved_model با استفاده از TensorFlow Serving انتظار می رود کار کند.
  • بارگذاری یک ایست بازرسی حاوی متغیرهای شکاف بهینه ساز خرد شده در تعداد متفاوتی از قطعات پشتیبانی نمی شود.
  • برای بازیابی از خرابی سرور پارامتر بدون راه اندازی مجدد کار هماهنگ کننده پشتیبانی نمی شود.
  • استفاده از tf.lookup.StaticHashTable (که معمولاً توسط برخی از لایه‌های پیش‌پردازش Keras استفاده می‌شود، مانند tf.keras.layers.IntegerLookup ، tf.keras.layers.StringLookup ، و tf.keras.layers.TextVectorization ) منجر به منابعی می‌شود که روی آنها قرار می‌گیرند. هماهنگ کننده در این زمان با آموزش سرور پارامتر. این پیامدهای عملکردی برای جستجوی RPC از کارگران به هماهنگ کننده دارد. رسیدگی به این موضوع در حال حاضر اولویت بالایی دارد.

مشخصات Model.fit

  • آرگومان steps_per_epoch در Model.fit مورد نیاز است. می توانید مقداری را انتخاب کنید که فواصل مناسب را در یک دوره ارائه دهد.
  • ParameterServerStrategy از تماس های سفارشی که تماس های سطح دسته ای به دلایل عملکرد دارند پشتیبانی نمی کند. شما باید آن تماس‌ها را با استفاده از steps_per_epoch به تماس‌های سطح دوره تبدیل کنید، به طوری که هر تعداد steps_per_epoch نامیده می‌شوند. تماس‌های داخلی تحت تأثیر قرار نمی‌گیرند: تماس‌های سطح دسته‌ای آن‌ها به گونه‌ای اصلاح شده‌اند که عملکردی داشته باشند. پشتیبانی از تماس های سطح دسته ای برای ParameterServerStrategy در حال برنامه ریزی است.
  • به همین دلیل، بر خلاف سایر استراتژی‌ها، نوار پیشرفت و معیارها فقط در مرزهای دوره ثبت می‌شوند.
  • run_eagerly پشتیبانی نمی شود.

مشخصات حلقه آموزشی سفارشی