روز جامعه ML 9 نوامبر است! برای به روز رسانی از TensorFlow، JAX به ما بپیوندید، و بیشتر بیشتر بدانید

آموزش سرور پارامتر با ParameterServerStrategy

مشاهده در TensorFlow.org در Google Colab اجرا کنید مشاهده منبع در GitHub دانلود دفترچه یادداشت

بررسی اجمالی

آموزش سرور پارامتر از روش داده موازی مشترک به مقیاس آموزش مدل بر روی چندین ماشین است.

خوشه آموزش سرور پارامتر مشتمل بر کارگران و سرور پارامتر. متغیرها بر روی سرورهای پارامتر ایجاد می شوند و توسط کارگران در هر مرحله خوانده و به روز می شوند. به طور پیش فرض ، کارگران بدون همگام سازی با یکدیگر این متغیرها را به طور مستقل می خوانند و به روز می کنند. به همین دلیل است که گاهی اوقات پارامتر آموزش سرور به سبک آموزش ناهمزمان نامیده می شود.

در TensorFlow 2، آموزش سرور پارامتر توسط طراحی tf.distribute.experimental.ParameterServerStrategy کلاس، که به یک خوشه که در مقیاسهای بالا به هزاران نفر از کارگران توزیع مراحل تمرین (همراه با سرور پارامتر).

روشهای آموزشی پشتیبانی شده

دو روش آموزش اصلی پشتیبانی شده وجود دارد:

خوشه ای با مشاغل و وظایف

صرف نظر از API انتخاب ( Model.fit و یا یک حلقه آموزش سفارشی)، آموزش توزیع شده در TensorFlow 2 شامل: یک 'cluster' با 'jobs' ، و هر یک از مشاغل ممکن است یک یا بیشتر داشته باشد 'tasks' .

هنگام استفاده از آموزش سرور پارامتر ، توصیه می شود موارد زیر را داشته باشید:

  • یک کار هماهنگ کننده (که نام کار chief )
  • شغل کارگر چندگانه (کار نام worker )؛ و
  • شغل سرور پارامتر های متعدد (کار نام ps )

در حالی که هماهنگ کننده منابع، اعزام آموزش وظایف ایجاد، می نویسد پست های بازرسی، و معاملات با شکست کار، کارگران و سرور پارامتر اجرا tf.distribute.Server که برای درخواست از هماهنگ کننده گوش می دهند.

آموزش سرور پارامتر با Model.fit API

پارامتر آموزش سرور با Model.fit API نیاز به هماهنگ کننده به استفاده از یک tf.distribute.experimental.ParameterServerStrategy شیء و یک tf.keras.utils.experimental.DatasetCreator به عنوان ورودی می باشد. مشابه به Model.fit استفاده با هیچ استراتژی، و یا با استراتژی های دیگر، گردش کار شامل ایجاد و تدوین مدل، آماده سازی تماس مجدد، به دنبال یک Model.fit پاسخ.

آموزش پارامتر سرور با یک حلقه آموزشی سفارشی

با حلقه های آموزش سفارشی، tf.distribute.experimental.coordinator.ClusterCoordinator کلاس جزء کلیدی مورد استفاده برای هماهنگ است.

بیشترین API مهم ارائه شده توسط ClusterCoordinator شی است schedule :

  • schedule API enqueues tf.function و آینده مانند گرداند RemoteValue بلافاصله.
  • توابع صف قرار خواهند گرفت به کارگران از راه دور در موضوعات پس زمینه اعزام و خود RemoteValue خواهد بود ناهمگام پر شده است.
  • از آنجا که schedule کند انتساب کارگر نیاز نیست، tf.function گذشت در می توان در هر کارگر در دسترس اجرا می شود.
  • اگر کارگری که بر روی آن اجرا شده است قبل از اتمام کار در دسترس قرار نگیرد ، این کار مجدداً روی یک کارگر موجود دیگر امتحان می شود.
  • به دلیل این واقعیت و این واقعیت که عملکرد تابع اتمی نیست ، ممکن است یک تابع بیش از یک بار اجرا شود.

علاوه بر اعزام توابع از راه دور، ClusterCoordinator نیز کمک می کند به ایجاد مجموعه داده در همه کارگران و بازسازی این مجموعه داده ها هنگامی که یک کارگر بازیابی از شکست.

راه اندازی آموزش

آموزش را به شعبه Model.fit و سفارشی مسیر آموزش حلقه، و شما می توانید یکی که متناسب با نیازهای خود را انتخاب کنید. بخشهای غیر از "آموزش با X" در هر دو مسیر قابل اجرا است.

pip install portpicker
pip uninstall tensorflow keras -y
pip install tf-nightly
import multiprocessing
import os
import random
import portpicker
import tensorflow as tf
from tensorflow.keras.layers.experimental import preprocessing

راه اندازی خوشه

همانطور که در بالا ذکر شد، یک خوشه پارامتر آموزش سرور نیاز به یک کار هماهنگ کننده که برنامه خود را آموزش، یک یا چند کارگر و وظایف سرور پارامتر است که اجرا TensorFlow سرورهای اجرا می شود tf.distribute.Server دنده احتمالا کار ارزیابی اضافی که ارزیابی اجرا می شود جانبی خودرو (بخش ارزیابی خودروهای جانبی را در زیر ببینید). الزامات راه اندازی آنها عبارتند از:

  • وظیفه هماهنگ کننده باید آدرس ها و پورت های دیگر سرورهای TensorFlow به جز ارزیاب را بداند.
  • کارگران و سرورهای پارامتر باید بدانند که به کدام پورت باید گوش دهند. به منظور سادگی ، معمولاً هنگام ایجاد سرورهای TensorFlow در این وظایف ، می توانید اطلاعات خوشه ای کامل را وارد کنید.
  • وظیفه ارزیاب نیازی به اطلاع از نحوه تنظیم خوشه آموزشی ندارد. اگر چنین است ، نباید سعی کند به خوشه آموزشی متصل شود.
  • کارگران و سرور پارامتر باید انواع کار به عنوان "worker" و "ps" به ترتیب،. هماهنگ کننده باید استفاده کنید "chief" را به عنوان نوع کار به دلایل میراث.

در این آموزش ، شما یک خوشه در حال ساخت ایجاد می کنید تا کل آموزش سرور پارامترها در Colab اجرا شود. شما خواهید آموخت که چگونه به راه اندازی خوشه واقعی در بخش بعد.

خوشه در حال انجام

شما با ایجاد چندین سرور TensorFlow از قبل شروع کرده و بعداً به آنها متصل می شوید. توجه داشته باشید که این فقط برای هدف تظاهرات این آموزش، و در آموزش واقعی سرور در آغاز شود "worker" و "ps" ماشین آلات.

def create_in_process_cluster(num_workers, num_ps):
  """Creates and starts local servers and returns the cluster_resolver."""
  worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
  ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]

  cluster_dict = {}
  cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
  if num_ps > 0:
    cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]

  cluster_spec = tf.train.ClusterSpec(cluster_dict)

  # Workers need some inter_ops threads to work properly.
  worker_config = tf.compat.v1.ConfigProto()
  if multiprocessing.cpu_count() < num_workers + 1:
    worker_config.inter_op_parallelism_threads = num_workers + 1

  for i in range(num_workers):
    tf.distribute.Server(
        cluster_spec,
        job_name="worker",
        task_index=i,
        config=worker_config,
        protocol="grpc")

  for i in range(num_ps):
    tf.distribute.Server(
        cluster_spec,
        job_name="ps",
        task_index=i,
        protocol="grpc")

  cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
      cluster_spec, rpc_layer="grpc")
  return cluster_resolver

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)
2021-07-22 01:22:29.962567: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-07-22 01:22:29.967320: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_SYSTEM_DRIVER_MISMATCH: system has unsupported display driver / cuda driver combination
2021-07-22 01:22:29.967351: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-1124739887
2021-07-22 01:22:29.967359: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-1124739887
2021-07-22 01:22:29.967434: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 470.57.2
2021-07-22 01:22:29.967458: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 465.27.0
2021-07-22 01:22:29.967464: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 465.27.0 does not match DSO version 470.57.2 -- cannot find working devices in this configuration
2021-07-22 01:22:29.971985: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:29.972012: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:29.972974: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:17310
2021-07-22 01:22:29.985134: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:29.985164: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:29.985628: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:22663
2021-07-22 01:22:30.034392: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.034437: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.035565: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:17641
2021-07-22 01:22:30.044623: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.044656: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.045149: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:19682
2021-07-22 01:22:30.090235: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.090288: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.090650: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:18874

راه اندازی خوشه ای در فرایند اغلب در تست واحد مانند استفاده می شود، در اینجا .

یکی دیگر از گزینه برای آزمایش محلی است برای راه اندازی فرآیندهای در محلی ماشین چک کردن آموزش چند کارگر با Keras برای مثال از این روش است.

از ParameterServerStrategy استفاده کنید

قبل از اینکه شما را به کد آموزش شیرجه رفتن، اجازه دهید یک نمونه از یک ParameterServerStrategy شی. توجه داشته باشید که این صرف نظر از اینکه شما با اقدام مورد نیاز است Model.fit و یا یک حلقه آموزش سفارشی. variable_partitioner استدلال خواهد شد در توضیح بخش sharding متغیر .

variable_partitioner = (
    tf.distribute.experimental.partitioners.MinSizePartitioner(
        min_shard_bytes=(256 << 10),
        max_shards=NUM_PS))

strategy = tf.distribute.experimental.ParameterServerStrategy(
    cluster_resolver,
    variable_partitioner=variable_partitioner)
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'ps': ['localhost:19682', 'localhost:18874'], 'worker': ['localhost:17310', 'localhost:22663', 'localhost:17641']})
INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'ps': ['localhost:19682', 'localhost:18874'], 'worker': ['localhost:17310', 'localhost:22663', 'localhost:17641']})
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:chief/replica:0/task:0/device:CPU:0'], variable_device = '/job:chief/replica:0/task:0/device:CPU:0'
INFO:tensorflow:Number of GPUs on workers: 0
2021-07-22 01:22:30.112542: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.112587: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.112599: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.136652: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.136690: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.136703: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.136754: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.136781: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.136789: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.136876: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.136917: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.136931: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.136937: I tensorflow/core/distributed_runtime/eager/eager_service_impl.cc:270] Creating sync eager service context with rendezvous_id on host kokoro-gcp-ubuntu-prod-1124739887 /job:ps/replica:0/task:1
2021-07-22 01:22:30.136965: I tensorflow/core/distributed_runtime/eager/eager_service_impl.cc:270] Creating sync eager service context with rendezvous_id on host kokoro-gcp-ubuntu-prod-1124739887 /job:ps/replica:0/task:0
2021-07-22 01:22:30.137027: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.137060: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.137071: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.137088: I tensorflow/core/distributed_runtime/eager/eager_service_impl.cc:270] Creating sync eager service context with rendezvous_id on host kokoro-gcp-ubuntu-prod-1124739887 /job:worker/replica:0/task:0
2021-07-22 01:22:30.137149: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.137185: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.137196: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.137204: I tensorflow/core/distributed_runtime/eager/eager_service_impl.cc:270] Creating sync eager service context with rendezvous_id on host kokoro-gcp-ubuntu-prod-1124739887 /job:worker/replica:0/task:1
2021-07-22 01:22:30.138485: I tensorflow/core/distributed_runtime/eager/eager_service_impl.cc:270] Creating sync eager service context with rendezvous_id on host kokoro-gcp-ubuntu-prod-1124739887 /job:worker/replica:0/task:2
2021-07-22 01:22:30.139971: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.139993: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.140000: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.140286: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:34915

به منظور استفاده از GPU ها برای آموزش ، GPU هایی را که برای هر کارگر قابل مشاهده است ، اختصاص دهید. ParameterServerStrategy خواهد تمام پردازنده های گرافیکی موجود در هر کارگر استفاده کنید، با محدودیت که تمامی کارگران باید به همان تعداد از GPU ها در دسترس است.

خرد کردن متغیر

sharding متغیر اشاره به تقسیم یک متغیر به متغیر های متعدد کوچکتر، که ذرات نامیده می شود. خرد کردن متغیر ممکن است برای توزیع بار شبکه هنگام دسترسی به این تکه ها مفید باشد. توزیع محاسبه و ذخیره یک متغیر معمولی در سرورهای چند پارامتری نیز مفید است.

برای فعال کردن sharding متغیر، شما می توانید در یک پاس variable_partitioner که ساخت یک ParameterServerStrategy شی. variable_partitioner خواهد شد استناد هر زمانی که یک متغیر ایجاد شده است و انتظار می رود به بازگشت تعداد ذرات در امتداد هر یک از ابعاد متغیر است. برخی خارج از جعبه variable_partitioner به عنوان چنین ارائه شده است tf.distribute.experimental.partitioners.MinSizePartitioner . توصیه می شود برای استفاده از partitioners مبتنی بر اندازه مانند tf.distribute.experimental.partitioners.MinSizePartitioner برای جلوگیری از پارتیشن بندی متغیرهای کوچک، که می تواند تاثیر منفی بر سرعت آموزش مدل است.

هنگامی که یک variable_partitioner در گذشت و اگر شما یک متغیر ایجاد کنید به طور مستقیم تحت strategy.scope() ، آن را تبدیل به یک نوع ظرف را با یک variables اموال فراهم می کند که دسترسی به لیست از خرده ریزهای. در بیشتر موارد ، این ظرف به صورت خودکار با اتصال همه تکه ها به Tensor تبدیل می شود. در نتیجه می توان از آن به عنوان یک متغیر معمولی استفاده کرد. از سوی دیگر، برخی از روش های TensorFlow مانند tf.nn.embedding_lookup ارائه اجرای کارآمد برای این نوع ظرف و در این روش الحاق خودکار نیز جلوگیری خواهد شد.

لطفا اسناد API از دیدن tf.distribute.experimental.ParameterServerStrategy برای جزئیات بیشتر.

آموزش با Model.fit

Keras از طریق یک API آموزش آسان برای استفاده و Model.fit که دستگیره حلقه آموزش در زیر هود، با انعطاف پذیری overridable train_step ، و تماس، که ارائه ویژگی های از قبیل صرفه جویی در پاسگاه ایست و بازرسی و یا خلاصه صرفه جویی برای TensorBoard. با Model.fit ، کد آموزش همان را می توان برای استراتژی های دیگر با یک مبادله ساده شی استراتژی استفاده می شود.

داده های ورودی

Model.fit با آموزش سرور پارامتر مستلزم آن است که داده های ورودی در یک قابل فراخوانی است که یک آرگومان از نوع ارائه tf.distribute.InputContext ، و برمی گرداند tf.data.Dataset . سپس، ایجاد tf.keras.utils.experimental.DatasetCreator شی که طول می کشد چنین callable ، و اختیاری tf.distribute.InputOptions طریق اعتراض input_options استدلال است.

توجه داشته باشید که توصیه می شود به زدن و داده ها را با آموزش سرور پارامتر تکرار، و مشخص steps_per_epoch در fit پاسخ بنابراین کتابخانه می داند مرزهای عصر است.

لطفا دیدن ورودی توزیع آموزش برای اطلاعات بیشتر در مورد InputContext استدلال است.

def dataset_fn(input_context):
  global_batch_size = 64
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)

  x = tf.random.uniform((10, 10))
  y = tf.random.uniform((10,))

  dataset = tf.data.Dataset.from_tensor_slices((x, y)).shuffle(10).repeat()
  dataset = dataset.shard(
      input_context.num_input_pipelines,
      input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2)

  return dataset

dc = tf.keras.utils.experimental.DatasetCreator(dataset_fn)

کد امنیتی را در dataset_fn خواهد شد بر روی دستگاه ورودی، که معمولا پردازنده، در هر یک از ماشین آلات کارگر استناد شده است.

ساخت و تدوین مدل

در حال حاضر، شما یک ایجاد tf.keras.Model بی اهمیت -a tf.keras.models.Sequential مدل برای تظاهرات اهداف به دنبال یک Model.compile پاسخ به ترکیب قطعات، مانند بهینه ساز، متریک، و یا پارامترهای مانند steps_per_execution :

with strategy.scope():
  model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)])

model.compile(tf.keras.optimizers.SGD(), loss='mse', steps_per_execution=10)

تماس و آموزش

قبل از اینکه شما پاسخ model.fit برای آموزش واقعی، اجازه دهید آماده سازی تماس مجدد مورد نیاز برای کارهای معمول، مانند:

  • ModelCheckpoint : برای صرفه جویی در وزن مدل.
  • BackupAndRestore : تا مطمئن شوید که پیشرفت آموزشی به طور خودکار حمایت کردن، و اگر بهبود در دسترس نبودن تجربه خوشه (مانند سقط جنین و یا پیشدستی)؛ یا
  • TensorBoard : برای صرفه جویی در گزارش پیشرفت به فایل های خلاصه، که در ابزار TensorBoard قابل مشاهده است.
working_dir = '/tmp/my_working_dir'
log_dir = os.path.join(working_dir, 'log')
ckpt_filepath = os.path.join(working_dir, 'ckpt')
backup_dir = os.path.join(working_dir, 'backup')

callbacks = [
    tf.keras.callbacks.TensorBoard(log_dir=log_dir),
    tf.keras.callbacks.ModelCheckpoint(filepath=ckpt_filepath),
    tf.keras.callbacks.experimental.BackupAndRestore(backup_dir=backup_dir),
]

model.fit(dc, epochs=5, steps_per_epoch=20, callbacks=callbacks)
2021-07-22 01:22:30.205180: I tensorflow/core/profiler/lib/profiler_session.cc:131] Profiler session initializing.
2021-07-22 01:22:30.205213: I tensorflow/core/profiler/lib/profiler_session.cc:146] Profiler session started.
2021-07-22 01:22:30.207087: I tensorflow/core/profiler/lib/profiler_session.cc:164] Profiler session tear down.
Epoch 1/5
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
2021-07-22 01:22:34.281880: I tensorflow/core/profiler/lib/profiler_session.cc:131] Profiler session initializing.
2021-07-22 01:22:34.281923: I tensorflow/core/profiler/lib/profiler_session.cc:146] Profiler session started.
2021-07-22 01:22:34.290681: I tensorflow/core/profiler/lib/profiler_session.cc:66] Profiler session collecting data.
2021-07-22 01:22:34.291221: I tensorflow/core/profiler/lib/profiler_session.cc:164] Profiler session tear down.
2021-07-22 01:22:34.292249: I tensorflow/core/profiler/rpc/client/save_profile.cc:136] Creating directory: /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34

2021-07-22 01:22:34.292801: I tensorflow/core/profiler/rpc/client/save_profile.cc:142] Dumped gzipped tool data for trace.json.gz to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.trace.json.gz
2021-07-22 01:22:34.294605: I tensorflow/core/profiler/rpc/client/save_profile.cc:136] Creating directory: /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34

2021-07-22 01:22:34.294780: I tensorflow/core/profiler/rpc/client/save_profile.cc:142] Dumped gzipped tool data for memory_profile.json.gz to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.memory_profile.json.gz
2021-07-22 01:22:34.294930: I tensorflow/core/profiler/rpc/client/capture_profile.cc:251] Creating directory: /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34
Dumped tool data for xplane.pb to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.xplane.pb
Dumped tool data for overview_page.pb to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.overview_page.pb
Dumped tool data for input_pipeline.pb to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.input_pipeline.pb
Dumped tool data for tensorflow_stats.pb to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.tensorflow_stats.pb
Dumped tool data for kernel_stats.pb to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.kernel_stats.pb

2021-07-22 01:22:34.380988: W tensorflow/core/framework/dataset.cc:679] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
20/20 - 4s - loss: 0.2856 - 4s/epoch - 201ms/step
2021-07-22 01:22:34.737150: W tensorflow/python/util/util.cc:348] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
2021-07-22 01:22:34.993072: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
2021-07-22 01:22:35.067372: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
Epoch 2/5
20/20 - 0s - loss: 0.3160 - 187ms/epoch - 9ms/step
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
Epoch 3/5
20/20 - 0s - loss: 0.2000 - 32ms/epoch - 2ms/step
2021-07-22 01:22:35.567146: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
2021-07-22 01:22:35.639496: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
WARNING:tensorflow:5 out of the last 5 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f6ce1aeb200> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
WARNING:tensorflow:6 out of the last 6 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f6cfc1e5560> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
Epoch 4/5
20/20 - 0s - loss: 0.2395 - 32ms/epoch - 2ms/step
2021-07-22 01:22:35.986756: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
2021-07-22 01:22:36.059412: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
Epoch 5/5
20/20 - 0s - loss: 0.1527 - 32ms/epoch - 2ms/step
2021-07-22 01:22:36.403661: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
2021-07-22 01:22:36.475197: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
2021-07-22 01:22:36.818981: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
2021-07-22 01:22:36.891188: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
<keras.callbacks.History at 0x7f6e7801fc50>

استفاده مستقیم با ClusterCoordinator (اختیاری)

حتی اگر شما را انتخاب کنید Model.fit مسیر آموزش، شما می توانید به صورت اختیاری یک نمونه از یک tf.distribute.experimental.coordinator.ClusterCoordinator شی به برنامه توابع دیگر شما می خواهم به بر کارگران اجرا شود. مراجعه کنید آموزش با آموزش حلقه های سفارشی بخش برای جزئیات بیشتر و نمونه های.

آموزش با یک حلقه آموزشی سفارشی

با استفاده از حلقه آموزش سفارشی با tf.distribute.Strategy انعطاف پذیری زیادی برای تعریف حلقه آموزش فراهم می کند. با ParameterServerStrategy بالا (به عنوان تعریف strategy )، شما یک استفاده tf.distribute.experimental.coordinator.ClusterCoordinator به اعزام اجرای مراحل آموزش به کارگران از راه دور.

سپس، شما یک مدل ایجاد کنید، تعریف یک مجموعه داده و یک تابع پله، به عنوان شما را در حلقه آموزش با دیگر انجام داده اند tf.distribute.Strategy است. شما می توانید جزئیات بیشتری پیدا کنید آموزش سفارشی با tf.distribute.Strategy آموزش.

برای اطمینان از واکشی اولیه مجموعه داده کارآمد، استفاده از مجموعه داده رابط های برنامه کاربردی ایجاد ذکر شده در توزیع توصیه می شود مراحل آموزش اعزام به کارگران از راه دور بخش زیر است. همچنین، مطمئن شوید که به پاسخ Strategy.run داخل worker_fn به بهره برداری کامل از GPU ها اختصاص داده شده به کارگران است. بقیه مراحل برای آموزش با یا بدون GPU یکسان است.

بیایید این اجزا را در مراحل زیر ایجاد کنیم:

داده ها را تنظیم کنید

اول، نوشتن یک تابع است که ایجاد یک مجموعه داده است که شامل پیش پردازش منطق های اجرا شده توسط Keras لایه از پیش پردازش .

شما خواهید این لایه ها در خارج از ایجاد dataset_fn داخل، اما اعمال تحول dataset_fn ، از شما را به قرار دادن dataset_fn به یک tf.function ، که اجازه نمی دهد متغیرهای به داخل آن ایجاد می شود.

feature_vocab = [
    "avenger", "ironman", "batman", "hulk", "spiderman", "kingkong", "wonder_woman"
]
label_vocab = ["yes", "no"]

with strategy.scope():
  feature_lookup_layer = preprocessing.StringLookup(
      vocabulary=feature_vocab,
      mask_token=None)
  label_lookup_layer = preprocessing.StringLookup(
      vocabulary=label_vocab,
      num_oov_indices=0,
      mask_token=None)

  raw_feature_input = tf.keras.layers.Input(
      shape=(3,),
      dtype=tf.string,
      name="feature")
  feature_id_input = feature_lookup_layer(raw_feature_input)
  feature_preprocess_stage = tf.keras.Model(
      {"features": raw_feature_input},
      feature_id_input)

  raw_label_input = tf.keras.layers.Input(
      shape=(1,),
      dtype=tf.string,
      name="label")
  label_id_input = label_lookup_layer(raw_label_input)

  label_preprocess_stage = tf.keras.Model(
      {"label": raw_label_input},
      label_id_input)

نمونه های اسباب بازی را در مجموعه داده ایجاد کنید:

def feature_and_label_gen(num_examples=200):
  examples = {"features": [], "label": []}
  for _ in range(num_examples):
    features = random.sample(feature_vocab, 3)
    label = ["yes"] if "avenger" in features else ["no"]
    examples["features"].append(features)
    examples["label"].append(label)
  return examples

examples = feature_and_label_gen()

سپس، ایجاد مجموعه داده آموزش پیچیده شده در یک dataset_fn :

def dataset_fn(_):
  raw_dataset = tf.data.Dataset.from_tensor_slices(examples)

  train_dataset = raw_dataset.map(
      lambda x: (
          {"features": feature_preprocess_stage(x["features"])},
          label_preprocess_stage(x["label"])
      )).shuffle(200).batch(32).repeat()
  return train_dataset

مدل را بسازید

در مرحله بعد ، مدل و سایر اشیاء را ایجاد کنید. اطمینان حاصل کنید که برای ایجاد تمام متغیرهای تحت strategy.scope .

# These variables created under the `strategy.scope` will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
  # Create the model. The input needs to be compatible with Keras processing layers.
  model_input = tf.keras.layers.Input(
      shape=(3,), dtype=tf.int64, name="model_input")

  emb_layer = tf.keras.layers.Embedding(
      input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=16384)
  emb_output = tf.reduce_mean(emb_layer(model_input), axis=1)
  dense_output = tf.keras.layers.Dense(units=1, activation="sigmoid")(emb_output)
  model = tf.keras.Model({"features": model_input}, dense_output)

  optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.1)
  accuracy = tf.keras.metrics.Accuracy()

بیایید تأیید می کند که استفاده از FixedShardsPartitioner تمام متغیرها به دو تکه تقسیم شده و هر ربان به سرورهای پارامتر های مختلف اختصاص داده بود:

assert len(emb_layer.weights) == 2
assert emb_layer.weights[0].shape == (4, 16384)
assert emb_layer.weights[1].shape == (4, 16384)
assert emb_layer.weights[0].device == "/job:ps/replica:0/task:0/device:CPU:0"
assert emb_layer.weights[1].device == "/job:ps/replica:0/task:1/device:CPU:0"

مرحله آموزش را مشخص کنید

سوم، ایجاد گام آموزش پیچیده را به یک tf.function :

@tf.function
def step_fn(iterator):

  def replica_fn(batch_data, labels):
    with tf.GradientTape() as tape:
      pred = model(batch_data, training=True)
      per_example_loss = tf.keras.losses.BinaryCrossentropy(
              reduction=tf.keras.losses.Reduction.NONE)(labels, pred)
      loss = tf.nn.compute_average_loss(per_example_loss)
      gradients = tape.gradient(loss, model.trainable_variables)

    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    accuracy.update_state(labels, actual_pred)
    return loss

  batch_data, labels = next(iterator)
  losses = strategy.run(replica_fn, args=(batch_data, labels))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)

در تابع پله آموزش بالا، خواستار Strategy.run و Strategy.reduce در step_fn می توانید GPU های متعدد برای هر کارگر حمایت می کنند. اگر کارگران GPU ها اختصاص داده شده، Strategy.run خواهد مجموعه داده در کپی های متعدد توزیع کنید.

ارسال مراحل آموزشی به کارگران دور افتاده

پس از همه محاسبات تعریف ParameterServerStrategy ، شما را به استفاده از tf.distribute.experimental.coordinator.ClusterCoordinator کلاس به ایجاد منابع و توزیع مراحل آموزش به کارگران از راه دور.

اجازه بدهید که ایجاد ClusterCoordinator شی و با تصویب در جسم استراتژی:

coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)

سپس ، یک مجموعه داده برای هر کارگر و یک تکرار کننده ایجاد کنید. در per_worker_dataset_fn زیر، بسته بندی dataset_fn به strategy.distribute_datasets_from_function توصیه می شود برای اجازه واکشی اولیه کارآمد برای GPU ها یکپارچه.

@tf.function
def per_worker_dataset_fn():
  return strategy.distribute_datasets_from_function(dataset_fn)

per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).

گام نهایی این است برای توزیع محاسبات به کارگران از راه دور با استفاده از ClusterCoordinator.schedule :

  • schedule روش enqueues tf.function و آینده مانند گرداند RemoteValue بلافاصله. توابع صف قرار خواهند گرفت به کارگران از راه دور در موضوعات پس زمینه اعزام و RemoteValue خواهد شد ناهمگام پر شده است.
  • join روش ( ClusterCoordinator.join ) را می توان مورد استفاده قرار گیرد به صبر کنید تا تمام توابع برنامه ریزی اجرا می شوند.
num_epoches = 4
steps_per_epoch = 5
for i in range(num_epoches):
  accuracy.reset_states()
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  # Wait at epoch boundaries.
  coordinator.join()
  print ("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
Finished epoch 0, accuracy is 0.668750.
Finished epoch 1, accuracy is 0.450000.
Finished epoch 2, accuracy is 1.000000.
Finished epoch 3, accuracy is 1.000000.

در اینجا این است که چگونه شما می توانید نتیجه یک واکشی RemoteValue :

loss = coordinator.schedule(step_fn, args=(per_worker_iterator,))
print ("Final loss is %f" % loss.fetch())
Final loss is 0.000000

متناوبا ، می توانید تمام مراحل را اجرا کرده و در حالی که منتظر اتمام هستید ، کاری انجام دهید:

for _ in range(total_steps):
  coordinator.schedule(step_fn, args=(per_worker_iterator,))
while not coordinator.done():
  time.sleep(10)
  # Do something like logging metrics or writing checkpoints.

برای آموزش کامل و خدمت گردش کار برای این مثال خاص، لطفا به این آزمون .

اطلاعات بیشتر در مورد ایجاد مجموعه داده

مجموعه اطلاعات، کد بالا را با استفاده از ایجاد شده است ClusterCoordinator.create_per_worker_dataset API). برای هر کارگر یک مجموعه داده ایجاد می کند و یک شیء کانتینر را برمی گرداند. شما می توانید پاسخ iter روش در آن برای ایجاد یک تکرارکننده هر کارگر. در هر کارگر تکرارکننده شامل یک تکرارکننده ازای هر کارگر و برش متناظر یک کارگر خواهد شد در آرگومان ورودی از تابع به تعویض ClusterCoordinator.schedule روش قبل از تابع در یک کارگر خاص اجرا می شود.

در حال حاضر، ClusterCoordinator.schedule روش فرض کارگران معادل هستند و در نتیجه فرض مجموعه داده در کارگران مختلف یکسان هستند به جز آنها ممکن است متفاوت حوصلگی اگر آنها حاوی یک Dataset.shuffle عملیات. از آنجا که این، آن را نیز توصیه می شود که مجموعه داده به طور نامحدود تکرار شود و به شما برنامه یک تعداد متناهی از مرحله به جای تکیه بر OutOfRangeError از یک مجموعه داده.

نکته مهم دیگر این است که tf.data مجموعه داده نمی ترتیب ضمنی و deserialization در سراسر مرزهای وظیفه پشتیبانی نمی کند. بنابراین مهم است که به ایجاد کل داده در داخل تابع به تصویب رسید به ClusterCoordinator.create_per_worker_dataset .

ارزیابی

بیش از یک روش برای تعریف و اجرای حلقه ارزیابی در آموزش های توزیع شده وجود دارد. هر کدام مزایا و معایب خاص خود را دارد که در زیر توضیح داده شده است. اگر ترجیح ندارید ، روش ارزیابی داخلی توصیه می شود.

ارزیابی درون خطی

در این روش، بطور متناوب هماهنگ کننده بین آموزش و ارزیابی و نتیجه آن است که آن را به نام ارزیابی های درون خطی.

ارزیابی داخلی مزایای متعددی دارد. مثلا:

  • این می تواند از مدلهای ارزشیابی بزرگ و مجموعه داده های ارزیابی که یک وظیفه واحد قادر به نگهداری آنها نیست پشتیبانی کند.
  • از نتایج ارزیابی می توان برای تصمیم گیری در مورد دوره بعدی استفاده کرد.

دو روش برای پیاده سازی ارزیابی درون خطی وجود دارد: ارزیابی مستقیم و ارزیابی توزیع شده.

  • ارزیابی مستقیم: برای مدل های کوچک و مجموعه داده های ارزیابی، هماهنگ کننده می توانید ارزیابی طور مستقیم بر روی مدل توزیع شده با مجموعه داده های ارزیابی در هماهنگ کننده را اجرا کنید:
eval_dataset = tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).batch(8)

eval_accuracy = tf.keras.metrics.Accuracy()

for batch_data, labels in eval_dataset:
  pred = model(batch_data, training=False)
  actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
  eval_accuracy.update_state(labels, actual_pred)

print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000
  • ارزیابی توزیع: برای مدل های بزرگ و یا مجموعه داده که غیر ممکن به اجرا به طور مستقیم در هماهنگ کننده، وظیفه هماهنگ کننده می تواند وظایف ارزیابی به کارگران از طریق توزیع ClusterCoordinator.schedule / ClusterCoordinator.join روش ها:
with strategy.scope():
  # Define the eval metric on parameter servers.
  eval_accuracy = tf.keras.metrics.Accuracy()

@tf.function
def eval_step(iterator):
  def replica_fn(batch_data, labels):
    pred = model(batch_data, training=False)
    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    eval_accuracy.update_state(labels, actual_pred)
  batch_data, labels = next(iterator)
  strategy.run(replica_fn, args=(batch_data, labels))

def eval_dataset_fn():
  return tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).shuffle(16).repeat().batch(8)

per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn)
per_worker_eval_iterator = iter(per_worker_eval_dataset)

eval_steps_per_epoch = 2
for _ in range(eval_steps_per_epoch):
  coordinator.schedule(eval_step, args=(per_worker_eval_iterator,))
coordinator.join()
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000

ارزیابی خودروهای جانبی

روش دیگر به نام ارزیابی جانبی خودرو که در آن شما یک کار ارزیاب اختصاصی ایجاد می کند که بارها و بارها خواند پست های بازرسی و اجرا می شود ارزیابی در شدن پاسگاه. اگر نیازی به تغییر حلقه آموزشی خود بر اساس نتایج ارزیابی ندارید ، برنامه آموزشی شما زود به پایان می رسد. با این حال ، برای انجام ارزیابی نیاز به یک کار ارزیاب اضافی و یک بازرسی دوره ای است. در زیر حلقه احتمالی ارزیابی خودروهای جانبی آمده است:

checkpoint_dir = ...
eval_model = ...
eval_data = ...
checkpoint = tf.train.Checkpoint(model=eval_model)

for latest_checkpoint in tf.train.checkpoints_iterator(
    checkpoint_dir):
  try:
    checkpoint.restore(latest_checkpoint).expect_partial()
  except (tf.errors.OpError,) as e:
    # checkpoint may be deleted by training when it is about to read it.
    continue

  # Optionally add callbacks to write summaries.
  eval_model.evaluate(eval_data)

  # Evaluation finishes when it has evaluated the last epoch.
  if latest_checkpoint.endswith('-{}'.format(train_epoches)):
    break

خوشه ها در دنیای واقعی

در یک محیط تولید واقعی ، همه وظایف را در فرایندهای مختلف بر روی ماشین های مختلف اجرا خواهید کرد. ساده ترین راه برای اطلاعات خوشه پیکربندی در هر وظیفه این است که مجموعه "TF_CONFIG" متغیرهای محیطی و استفاده از یک tf.distribute.cluster_resolver.TFConfigClusterResolver به تجزیه "TF_CONFIG" .

برای توضیحات کلی در مورد "TF_CONFIG" متغیرهای محیطی، به مراجعه آموزش توزیع راهنمای.

اگر شما شروع به وظایف آموزشی خود را با استفاده از Kubernetes یا دیگر قالب پیکربندی، آن است که به احتمال بسیار زیاد است که این قالب در حال حاضر راه “TF_CONFIG" برای شما.

تنظیم "TF_CONFIG" متغیر محیطی

فرض کنید شما 3 کارگران و 2 سرور پارامتر، "TF_CONFIG" کارگر 1 می تواند:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"],
        "chief": ["host6:port"]
    },
    "task": {"type": "worker", "index": 1}
})

"TF_CONFIG" از ارزیاب می تواند:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "evaluator": ["host7:port"]
    },
    "task": {"type": "evaluator", "index": 0}
})

"cluster" در بالا "TF_CONFIG" رشته برای ارزیاب اختیاری است.

اگر از باینری یکسان برای همه وظایف استفاده می کنید

اگر ترجیح می دهید همه این وظایف را با استفاده از یک فایل باینری انجام دهید ، باید برنامه خود را در ابتدا به نقش های مختلف تقسیم کنید:

cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if cluster_resolver.task_type in ("worker", "ps"):
  # Start a TensorFlow server and wait.
elif cluster_resolver.task_type == "evaluator":
  # Run side-car evaluation
else:
  # Run the coordinator.

کد زیر سرور TensorFlow را راه اندازی کرده و منتظر می ماند:

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

server = tf.distribute.Server(
    cluster_resolver.cluster_spec(),
    job_name=cluster_resolver.task_type,
    task_index=cluster_resolver.task_id,
    protocol=cluster_resolver.rpc_layer or "grpc",
    start=True)
server.join()

رسیدگی به شکست کار

شکست کارگر

tf.distribute.experimental.coordinator.ClusterCoordinator یا Model.fit ساخته شده است در ارائه تحمل خطا برای شکست کارگر. تخلیهء کارگر، تابع قبلا ارائه مجموعه داده (یا به ClusterCoordinator.create_per_worker_dataset برای یک حلقه آموزش سفارشی، و یا tf.keras.utils.experimental.DatasetCreator برای Model.fit ) خواهد شد و در کارگران به استناد دوباره ایجاد مجموعه داده.

خرابی سرور پارامتر یا هماهنگ کننده

با این حال، زمانی که هماهنگ کننده می بیند یک خطای سرور پارامتر، آن را به بالا بردن UnavailableError یا AbortedError بلافاصله. در این صورت می توانید هماهنگ کننده را مجدداً راه اندازی کنید. خود هماهنگ کننده نیز می تواند در دسترس نباشد. بنابراین ، برای از دست ندادن پیشرفت آموزشی ، ابزارهای خاصی توصیه می شود:

  • برای Model.fit ، شما باید یک استفاده BackupAndRestore پاسخ به تماس، که صرفه جویی در پیشرفت و بازسازی به صورت خودکار دسته. مشاهده callback در و آموزش قسمت بالا برای مثال.

  • برای یک حلقه آموزشی سفارشی ، باید متغیرهای مدل را به صورت دوره ای چک کنید و متغیرهای مدل را از یک نقطه بازرسی ، در صورت وجود ، قبل از شروع آموزش بارگذاری کنید. پیشرفت آموزش را می توان تقریبا از استنباط optimizer.iterations اگر یک بهینه ساز checkpointed است:

checkpoint_manager = tf.train.CheckpointManager(
    tf.train.Checkpoint(model=model, optimizer=optimizer),
    checkpoint_dir,
    max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
  checkpoint = checkpoint_manager.checkpoint
  checkpoint.restore(
      checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()

global_steps = int(optimizer.iterations.numpy())
starting_epoch = global_steps // steps_per_epoch

for _ in range(starting_epoch, num_epoches):
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  coordinator.join()
  checkpoint_manager.save()

واکشی یک RemoteValue

واکشی یک RemoteValue تضمین شده است برای موفقیت اگر یک تابع با موفقیت اجرا شده است. زیرا در حال حاضر مقدار بازگشتی بلافاصله پس از اجرای یک تابع در هماهنگ کننده کپی می شود. در صورت خرابی کارگر در حین کپی ، عملکرد مجدداً روی کارگر موجود دیگر امتحان می شود. بنابراین ، اگر می خواهید عملکرد را بهینه کنید ، می توانید توابع را بدون مقدار برگشت برنامه ریزی کنید.

گزارش خطا

هنگامی که هماهنگ کننده خطایی مانند بیند UnavailableError از سرورهای پارامتر و یا سایر خطاهای برنامهای مانند InvalidArgument از tf.debugging.check_numerics ، آن را به همه توابع انتظار و صف قبل از بالا بردن خطا را لغو نمایید. در حال واکشی مربوط به خود را RemoteValue بازدید کنندگان یک افزایش CancelledError .

پس از ایجاد خطا ، هماهنگ کننده همان خطا یا خطایی را در عملکردهای لغو شده مطرح نمی کند.

بهبود عملکرد

دلایل متعددی ممکن است اگر شما مسائل مربوط به عملکرد ببینید زمانی که شما با قطار وجود دارد ParameterServerStrategy و ClusterResolver .

یکی از دلایل رایج این است که سرورهای پارامتر دارای بار نامتعادل هستند و برخی از سرورهای پارامترهای دارای بار زیاد به ظرفیت رسیده اند. همچنین ممکن است دلایل ریشه ای متعددی وجود داشته باشد. برخی از روشهای ساده برای کاهش این مشکل عبارتند از:

  1. شارد متغیرهای مدل بزرگ خود را از طریق تعیین یک variable_partitioner که ساخت یک ParameterServerStrategy .
  2. در صورت امکان از ایجاد یک متغیر hotspot که مورد نیاز همه سرورهای پارامتر در یک مرحله است اجتناب کنید. به عنوان مثال، استفاده از نرخ یادگیری ثابت یا زیر کلاس tf.keras.optimizers.schedules.LearningRateSchedule در بهینه از رفتار پیش فرض این است که نرخ یادگیری را تبدیل به یک متغیر در هر مرحله قرار داده شده در سرور پارامتر خاص و درخواست شده توسط تمام سرورهای پارامتر های دیگر به
  3. واژگان بزرگ خود را قبل از انتقال به لایه های پیش پردازش Keras ، تغییر دهید.

یکی دیگر از دلایل احتمالی مسائل مربوط به عملکرد ، هماهنگ کننده است. اولین اجرای خود را از schedule / join است بر اساس پایتون و در نتیجه ممکن است نخ در بالای سر. همچنین تأخیر بین هماهنگ کننده و کارگران می تواند زیاد باشد. اگر این مورد است،

  • برای Model.fit ، شما می توانید steps_per_execution استدلال در ارائه Model.compile به یک مقدار بزرگتر از 1.

  • برای یک حلقه آموزش سفارشی، شما می توانید مراحل متعدد را به یک بسته tf.function :

steps_per_invocation = 10

@tf.function
def step_fn(iterator):
  for _ in range(steps_per_invocation):
    features, labels = next(iterator)
    def replica_fn(features, labels):
      ...

    strategy.run(replica_fn, args=(features, labels))

با بهینه سازی بیشتر کتابخانه ، امیدواریم اکثر کاربران در آینده مجبور نباشند مراحل را به صورت دستی بسته بندی کنند.

علاوه بر این ، یک ترفند کوچک برای بهبود عملکرد ، برنامه ریزی توابع بدون مقدار برگشتی است ، همانطور که در قسمت شکست کار در بالا توضیح داده شد.

محدودیت های شناخته شده

اکثر محدودیت های شناخته شده قبلاً در قسمت های بالا ذکر شده است. این بخش خلاصه ای ارائه می دهد.

ParameterServerStrategy عمومی

  • os.environment["grpc_fail_fast"]="use_caller" در هر کار از جمله هماهنگ کننده مورد نیاز است، به کار تحمل خطا به درستی.
  • آموزش سرور پارامتر سنکرون پشتیبانی نمی شود.
  • معمولاً برای دستیابی به عملکرد مطلوب ، باید چندین مرحله را در یک عملکرد واحد قرار داد.
  • آن را پشتیبانی نمی برای بارگذاری یک saved_model طریق tf.saved_model.load حاوی متغیرهای sharded. توجه داشته باشید که بارگذاری چنین مدل ذخیره شده با استفاده از سرویس TensorFlow کار می کند.
  • بارگیری یک ایست بازرسی حاوی متغیرهای شکاف بهینه ساز خرد شده در تعداد متفاوتی از قطعات پشتیبانی نمی شود.
  • بازیابی از خرابی پارامتر سرور بدون شروع مجدد کار هماهنگ کننده پشتیبانی نمی شود.
  • استفاده از tf.lookup.StaticHashTable (که معمولا توسط برخی از به کار tf.keras.layers.experimental.preprocessing لایه ها، مانند IntegerLookup ، StringLookup و TextVectorization نتایج در منابع در هماهنگ کننده در این زمان با آموزش سرور پارامتر قرار می گیرد). این امر دارای پیامدهای عملکردی برای جستجوی RPC ها از کارگران تا هماهنگ کننده است. پرداختن به این مهمترین اولویت فعلی است.

Model.fit جزئیات

  • steps_per_epoch استدلال در مورد نیاز است Model.fit . می توانید مقداری را انتخاب کنید که فواصل مناسب را در یک دوره ارائه دهد.
  • ParameterServerStrategy می کند پشتیبانی از تماس مجدد سفارشی که تماس های در سطح گروه به دلایل عملکرد ندارد. شما باید آن دسته از تماس به تماس های سطح عصر با مناسب برداشت تبدیل steps_per_epoch ، به طوری که آنها هر نام steps_per_epoch تعداد مراحل. تماس های داخلی ساخته شده تحت تأثیر قرار نمی گیرند: تماس های دسته ای آنها به منظور عملکرد بهتر تغییر کرده است. حمایت از تماس های سطح دسته ای برای ParameterServerStrategy است که برنامه ریزی شده.
  • به همین دلیل ، بر خلاف سایر استراتژی ها ، نوار پیشرفت و معیارها فقط در مرزهای دوره ثبت می شوند.
  • run_eagerly پشتیبانی نمی شود.

مشخصات حلقه آموزشی سفارشی