การฝึกอบรมเซิร์ฟเวอร์พารามิเตอร์ด้วย ParameterServerStrategy

ดูบน TensorFlow.org ทำงานใน Google Colab ดูแหล่งที่มาบน GitHub ดาวน์โหลดโน๊ตบุ๊ค

ภาพรวม

การฝึกอบรมเซิร์ฟเวอร์พารามิเตอร์ เป็นวิธีการที่ข้อมูลขนานร่วมกันในการไต่ขึ้นการฝึกอบรมรุ่นในหลายเครื่อง

กลุ่มฝึกอบรมเซิร์ฟเวอร์พารามิเตอร์ประกอบด้วยคนงานและเซิร์ฟเวอร์พารามิเตอร์ ตัวแปรถูกสร้างขึ้นบนเซิร์ฟเวอร์พารามิเตอร์ และจะถูกอ่านและอัปเดตโดยผู้ปฏิบัติงานในแต่ละขั้นตอน โดยค่าเริ่มต้น ผู้ปฏิบัติงานจะอ่านและอัปเดตตัวแปรเหล่านี้อย่างอิสระโดยไม่ทำข้อมูลให้ตรงกัน นี่คือเหตุผลที่บางครั้งพารามิเตอร์การฝึกอบรมเซิร์ฟเวอร์แบบที่เรียกว่าการฝึกอบรมไม่ตรงกัน

ใน TensorFlow 2 การฝึกอบรมเซิร์ฟเวอร์พารามิเตอร์ขับเคลื่อนโดย tf.distribute.experimental.ParameterServerStrategy ระดับซึ่งจัดจำหน่ายขั้นตอนการฝึกอบรมเพื่อคลัสเตอร์ที่ตาชั่งถึงหลายพันคนงาน (มาพร้อมกับเซิร์ฟเวอร์พารามิเตอร์)

วิธีการฝึกอบรมที่รองรับ

มีสองวิธีการฝึกอบรมที่ได้รับการสนับสนุนหลัก:

คลัสเตอร์ที่มีงานและงาน

โดยไม่คำนึงถึง API ของทางเลือก ( Model.fit หรือห่วงการฝึกอบรมที่กำหนดเอง), การฝึกอบรมการกระจายใน TensorFlow 2 เกี่ยวข้องกับการที่: 'cluster' กับหลาย 'jobs' และแต่ละงานที่อาจจะมีหนึ่งหรือมากกว่า 'tasks'

เมื่อใช้การฝึกอบรมเซิร์ฟเวอร์พารามิเตอร์ ขอแนะนำให้มี:

  • อีกหนึ่งงานประสานงาน (ซึ่งมีชื่องาน chief )
  • งานคนงานหลาย (ชื่องาน worker ); และ
  • หลายงานเซิร์ฟเวอร์พารามิเตอร์ (ชื่องาน ps )

ในขณะที่ผู้ประสานงานสร้างทรัพยากรยื้อการฝึกอบรมงานเขียนจุดตรวจและเกี่ยวข้องกับความล้มเหลวของงานที่คนงานและเซิร์ฟเวอร์พารามิเตอร์ทำงาน tf.distribute.Server ที่ฟังสำหรับการร้องขอจากผู้ประสานงาน

การฝึกอบรมเซิร์ฟเวอร์พารามิเตอร์กับ Model.fit API

การฝึกอบรมเซิร์ฟเวอร์พารามิเตอร์กับ Model.fit API ต้องประสานงานที่จะใช้ tf.distribute.experimental.ParameterServerStrategy วัตถุและ tf.keras.utils.experimental.DatasetCreator เป็นอินพุท คล้ายกับ Model.fit ใช้กับกลยุทธ์การไม่มีหรือมีกลยุทธ์อื่น ๆ ขั้นตอนการทำงานที่เกี่ยวข้องกับการสร้างและรวบรวมรูปแบบการเตรียมความพร้อมการเรียกกลับตาม Model.fit โทร

การฝึกอบรมเซิร์ฟเวอร์พารามิเตอร์พร้อมลูปการฝึกแบบกำหนดเอง

ด้วยการฝึกอบรมลูปที่กำหนดเอง tf.distribute.experimental.coordinator.ClusterCoordinator ระดับเป็นองค์ประกอบสำคัญที่ใช้ในการประสานงาน

  • ClusterCoordinator ระดับความต้องการที่จะทำงานร่วมกับ tf.distribute.Strategy วัตถุ
  • นี้ tf.distribute.Strategy วัตถุเป็นสิ่งจำเป็นที่จะให้ข้อมูลของคลัสเตอร์และถูกนำมาใช้ในการกำหนดขั้นตอนการฝึกอบรมที่แสดงให้เห็นใน การฝึกอบรมที่กำหนดเองกับ tf.distribute.Strategy
  • ClusterCoordinator วัตถุแล้วยื้อขั้นตอนการดำเนินการของการฝึกอบรมให้กับแรงงานเหล่านี้ระยะไกล
  • สำหรับการฝึกอบรมเซิร์ฟเวอร์พารามิเตอร์ที่ ClusterCoordinator ต้องการที่จะทำงานกับ tf.distribute.experimental.ParameterServerStrategy

API ที่สำคัญที่สุดให้โดย ClusterCoordinator วัตถุเป็น schedule :

  • schedule API enqueues tf.function และผลตอบแทนในอนาคตเหมือน RemoteValue ทันที
  • ฟังก์ชั่นการจัดคิวจะถูกส่งไปยังคนไกลในหัวข้อพื้นหลังของพวกเขาและ RemoteValue s จะเต็มไปถ่ายทอดสด
  • เนื่องจาก schedule ไม่ต้องใช้คนงานที่ได้รับมอบหมายที่ tf.function ผ่านสามารถดำเนินการเกี่ยวกับการปฏิบัติงานใด ๆ
  • หากผู้ปฏิบัติงานที่ดำเนินการอยู่ไม่พร้อมใช้งานก่อนที่จะเสร็จสมบูรณ์ ฟังก์ชันจะถูกลองอีกครั้งกับผู้ปฏิบัติงานอื่นที่พร้อมใช้งาน
  • เนื่องจากข้อเท็จจริงนี้และความจริงที่ว่าการเรียกใช้ฟังก์ชันไม่ใช่อะตอมมิก ฟังก์ชันอาจถูกเรียกใช้งานมากกว่าหนึ่งครั้ง

นอกเหนือไปจากฟังก์ชั่นการฝึกอบรมระยะไกล ClusterCoordinator ยังช่วยในการสร้างชุดข้อมูลเกี่ยวกับคนงานทุกคนและสร้างชุดข้อมูลเหล่านี้เมื่อมีการฟื้นตัวจากความล้มเหลวของผู้ปฏิบัติงาน

ตั้งค่าการสอน

กวดวิชาจะเข้าไปในสาขา Model.fit และการฝึกอบรมที่กำหนดเองเส้นทางห่วงและคุณสามารถเลือกหนึ่งที่เหมาะกับความต้องการของคุณ ส่วนอื่นที่ไม่ใช่ "การฝึกกับ X" ใช้ได้กับทั้งสองเส้นทาง

pip install portpicker
pip uninstall tensorflow keras -y
pip install tf-nightly
import multiprocessing
import os
import random
import portpicker
import tensorflow as tf
from tensorflow.keras.layers.experimental import preprocessing

การตั้งค่าคลัสเตอร์

ดังกล่าวข้างต้นคลัสเตอร์พารามิเตอร์การฝึกอบรมเซิร์ฟเวอร์จำเป็นต้องเป็นงานที่ผู้ประสานงานที่รันโปรแกรมการฝึกอบรมหนึ่งหรือหลายคนงานและงานเซิร์ฟเวอร์พารามิเตอร์ที่ทำงาน TensorFlow servers- tf.distribute.Server และอื่นอาจจะเป็นงานที่ประเมินผลเพิ่มเติมที่การประเมินผลการทำงานด้านรถยนต์ (ดูส่วนการประเมินรถด้านข้างด้านล่าง) ข้อกำหนดในการตั้งค่าคือ:

  • งานผู้ประสานงานจำเป็นต้องทราบที่อยู่และพอร์ตของเซิร์ฟเวอร์ TensorFlow อื่นๆ ทั้งหมด ยกเว้นผู้ประเมิน
  • ผู้ปฏิบัติงานและเซิร์ฟเวอร์พารามิเตอร์จำเป็นต้องรู้ว่าพอร์ตใดที่พวกเขาต้องรับฟัง เพื่อความง่าย คุณสามารถส่งข้อมูลคลัสเตอร์ที่สมบูรณ์เมื่อสร้างเซิร์ฟเวอร์ TensorFlow ในงานเหล่านี้
  • งานผู้ประเมินไม่จำเป็นต้องทราบการตั้งค่าของคลัสเตอร์การฝึกอบรม หากเป็นเช่นนั้น ก็ไม่ควรพยายามเชื่อมต่อกับคลัสเตอร์การฝึก
  • แรงงานและเซิร์ฟเวอร์พารามิเตอร์ควรมีงานประเภทเป็น "worker" และ "ps" ตามลำดับ ผู้ประสานงานควรใช้ "chief" เป็นประเภทงานด้วยเหตุผลมรดก

ในบทช่วยสอนนี้ คุณจะสร้างคลัสเตอร์ระหว่างดำเนินการเพื่อให้สามารถเรียกใช้การฝึกอบรมเซิร์ฟเวอร์พารามิเตอร์ทั้งหมดใน Colab คุณจะได้เรียนรู้วิธีการตั้งค่า กลุ่มจริง ในส่วนต่อมา

คลัสเตอร์ในกระบวนการ

คุณจะเริ่มต้นด้วยการสร้างเซิร์ฟเวอร์ TensorFlow หลายตัวล่วงหน้าและเชื่อมต่อในภายหลัง หมายเหตุว่านี้เป็นเพียงเพื่อวัตถุประสงค์ในการสาธิตของการกวดวิชานี้และในการฝึกอบรมจริงเซิร์ฟเวอร์จะเริ่มต้นใน "worker" และ "ps" เครื่อง

def create_in_process_cluster(num_workers, num_ps):
  """Creates and starts local servers and returns the cluster_resolver."""
  worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
  ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]

  cluster_dict = {}
  cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
  if num_ps > 0:
    cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]

  cluster_spec = tf.train.ClusterSpec(cluster_dict)

  # Workers need some inter_ops threads to work properly.
  worker_config = tf.compat.v1.ConfigProto()
  if multiprocessing.cpu_count() < num_workers + 1:
    worker_config.inter_op_parallelism_threads = num_workers + 1

  for i in range(num_workers):
    tf.distribute.Server(
        cluster_spec,
        job_name="worker",
        task_index=i,
        config=worker_config,
        protocol="grpc")

  for i in range(num_ps):
    tf.distribute.Server(
        cluster_spec,
        job_name="ps",
        task_index=i,
        protocol="grpc")

  cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
      cluster_spec, rpc_layer="grpc")
  return cluster_resolver

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)
2021-07-22 01:22:29.962567: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-07-22 01:22:29.967320: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_SYSTEM_DRIVER_MISMATCH: system has unsupported display driver / cuda driver combination
2021-07-22 01:22:29.967351: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: kokoro-gcp-ubuntu-prod-1124739887
2021-07-22 01:22:29.967359: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: kokoro-gcp-ubuntu-prod-1124739887
2021-07-22 01:22:29.967434: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 470.57.2
2021-07-22 01:22:29.967458: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 465.27.0
2021-07-22 01:22:29.967464: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 465.27.0 does not match DSO version 470.57.2 -- cannot find working devices in this configuration
2021-07-22 01:22:29.971985: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:29.972012: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:29.972974: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:17310
2021-07-22 01:22:29.985134: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:29.985164: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:29.985628: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:22663
2021-07-22 01:22:30.034392: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.034437: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.035565: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:17641
2021-07-22 01:22:30.044623: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.044656: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.045149: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:19682
2021-07-22 01:22:30.090235: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.090288: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.090650: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:18874

การติดตั้งคลัสเตอร์ในกระบวนการมักจะถูกใช้ในการทดสอบหน่วยเช่น ที่นี่

ตัวเลือกสำหรับการทดสอบในท้องถิ่นก็คือการเปิดตัวกระบวนการบนเครื่องตรวจสอบท้องถิ่นออกจาก การฝึกอบรมหลายงานกับ Keras ตัวอย่างของวิธีการนี้

สร้างอินสแตนซ์ ParameterServerStrategy

ก่อนที่คุณจะดำน้ำในรหัสการฝึกอบรมขอยกตัวอย่าง ParameterServerStrategy วัตถุ ทราบว่านี้เป็นสิ่งจำเป็นโดยไม่คำนึงถึงว่าคุณจะดำเนินการกับ Model.fit หรือห่วงการฝึกอบรมที่กำหนดเอง variable_partitioner อาร์กิวเมนต์จะอธิบายใน ส่วน sharding ตัวแปร

variable_partitioner = (
    tf.distribute.experimental.partitioners.MinSizePartitioner(
        min_shard_bytes=(256 << 10),
        max_shards=NUM_PS))

strategy = tf.distribute.experimental.ParameterServerStrategy(
    cluster_resolver,
    variable_partitioner=variable_partitioner)
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'ps': ['localhost:19682', 'localhost:18874'], 'worker': ['localhost:17310', 'localhost:22663', 'localhost:17641']})
INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'ps': ['localhost:19682', 'localhost:18874'], 'worker': ['localhost:17310', 'localhost:22663', 'localhost:17641']})
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:chief/replica:0/task:0/device:CPU:0'], variable_device = '/job:chief/replica:0/task:0/device:CPU:0'
INFO:tensorflow:Number of GPUs on workers: 0
2021-07-22 01:22:30.112542: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.112587: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.112599: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.136652: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.136690: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.136703: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.136754: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.136781: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.136789: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.136876: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.136917: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.136931: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.136937: I tensorflow/core/distributed_runtime/eager/eager_service_impl.cc:270] Creating sync eager service context with rendezvous_id on host kokoro-gcp-ubuntu-prod-1124739887 /job:ps/replica:0/task:1
2021-07-22 01:22:30.136965: I tensorflow/core/distributed_runtime/eager/eager_service_impl.cc:270] Creating sync eager service context with rendezvous_id on host kokoro-gcp-ubuntu-prod-1124739887 /job:ps/replica:0/task:0
2021-07-22 01:22:30.137027: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.137060: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.137071: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.137088: I tensorflow/core/distributed_runtime/eager/eager_service_impl.cc:270] Creating sync eager service context with rendezvous_id on host kokoro-gcp-ubuntu-prod-1124739887 /job:worker/replica:0/task:0
2021-07-22 01:22:30.137149: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.137185: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.137196: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.137204: I tensorflow/core/distributed_runtime/eager/eager_service_impl.cc:270] Creating sync eager service context with rendezvous_id on host kokoro-gcp-ubuntu-prod-1124739887 /job:worker/replica:0/task:1
2021-07-22 01:22:30.138485: I tensorflow/core/distributed_runtime/eager/eager_service_impl.cc:270] Creating sync eager service context with rendezvous_id on host kokoro-gcp-ubuntu-prod-1124739887 /job:worker/replica:0/task:2
2021-07-22 01:22:30.139971: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job ps -> {0 -> localhost:19682, 1 -> localhost:18874}
2021-07-22 01:22:30.139993: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:17310, 1 -> localhost:22663, 2 -> localhost:17641}
2021-07-22 01:22:30.140000: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job chief -> {0 -> localhost:34915}
2021-07-22 01:22:30.140286: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:427] Started server with target: grpc://localhost:34915

หากต้องการใช้ GPU ในการฝึกอบรม ให้จัดสรร GPU ที่พนักงานแต่ละคนมองเห็นได้ ParameterServerStrategy จะใช้ GPUs ทั้งหมดที่มีอยู่ในแต่ละงานมีข้อ จำกัด ที่ทุกคนควรจะมีหมายเลขเดียวกันของ GPUs ที่มีอยู่

การแบ่งส่วนตัวแปร

sharding ตัวแปรหมายถึงการแยกตัวแปรลงในตัวแปรขนาดเล็กหลาย ๆ ซึ่งเรียกว่าเศษ การแบ่งกลุ่มย่อยแบบแปรผันอาจมีประโยชน์ในการกระจายโหลดของเครือข่ายเมื่อเข้าถึงชาร์ดเหล่านี้ นอกจากนี้ยังเป็นประโยชน์ในการกระจายการคำนวณและการจัดเก็บตัวแปรปกติในเซิร์ฟเวอร์พารามิเตอร์หลายตัว

ต้องการเปิดใช้งาน sharding ตัวแปรคุณสามารถส่งผ่านใน variable_partitioner เมื่อสร้าง ParameterServerStrategy วัตถุ variable_partitioner จะถูกเรียกทุกครั้งเมื่อมีตัวแปรที่ถูกสร้างขึ้นและคาดว่าจะกลับมาจำนวนเศษพร้อมมิติของตัวแปรแต่ละ บางส่วนออกจากกล่อง variable_partitioner s มีให้เช่น tf.distribute.experimental.partitioners.MinSizePartitioner ก็จะแนะนำให้ใช้ partitioners ขนาดตามชอบ tf.distribute.experimental.partitioners.MinSizePartitioner เพื่อหลีกเลี่ยงการแบ่งพาร์ทิชันตัวแปรขนาดเล็กซึ่งอาจมีผลกระทบต่อความเร็วในการฝึกอบรมรุ่น

เมื่อ variable_partitioner จะถูกส่งเข้ามาและถ้าคุณสร้างตัวแปรโดยตรงภายใต้ strategy.scope() ก็จะกลายเป็นประเภทภาชนะที่มี variables ทรัพย์สินซึ่งให้การเข้าถึงไปยังรายการของเศษ ในกรณีส่วนใหญ่ คอนเทนเนอร์นี้จะถูกแปลงเป็นเทนเซอร์โดยอัตโนมัติโดยการต่อชาร์ดทั้งหมดเข้าด้วยกัน เป็นผลให้สามารถใช้เป็นตัวแปรปกติได้ ในทางกลับกันบางวิธี TensorFlow เช่น tf.nn.embedding_lookup ให้การดำเนินงานที่มีประสิทธิภาพสำหรับภาชนะประเภทนี้และวิธีการเหล่านี้เรียงต่อกันโดยอัตโนมัติจะหลีกเลี่ยงได้

โปรดดูเอกสาร API ของ tf.distribute.experimental.ParameterServerStrategy สำหรับรายละเอียดเพิ่มเติม

การฝึกอบรมกับ Model.fit

Keras ให้การฝึกอบรมที่ง่ายต่อการใช้งาน API ผ่าน Model.fit ที่จับห่วงการฝึกอบรมภายใต้ประทุน, มีความยืดหยุ่นของ overridable train_step และเรียกกลับซึ่งจะให้ฟังก์ชันการทำงานเช่นการประหยัดด่านหรือสรุปการออมเพื่อการ TensorBoard ด้วย Model.fit รหัสการฝึกอบรมเดียวกันสามารถใช้สำหรับกลยุทธ์อื่น ๆ ที่มีการแลกเปลี่ยนที่เรียบง่ายของวัตถุกลยุทธ์ที่

ป้อนข้อมูล

Model.fit ด้วยการฝึกอบรมเซิร์ฟเวอร์พารามิเตอร์กำหนดว่าข้อมูลของท่านจะถูกระบุไว้ใน callable ที่ใช้อาร์กิวเมนต์เดียวประเภท tf.distribute.InputContext และส่งกลับ tf.data.Dataset จากนั้นสร้าง tf.keras.utils.experimental.DatasetCreator วัตถุที่ใช้เวลาดังกล่าว callable และเลือก tf.distribute.InputOptions วัตถุผ่าน input_options โต้แย้ง

โปรดทราบว่าจะแนะนำให้สับเปลี่ยนและทำซ้ำข้อมูลด้วยการฝึกอบรมเซิร์ฟเวอร์พารามิเตอร์และระบุ steps_per_epoch ใน fit เรียกร้องเพื่อให้ห้องสมุดรู้ขอบเขตยุค

โปรดดู การป้อนข้อมูลที่กระจาย การสอนสำหรับข้อมูลเพิ่มเติมเกี่ยวกับ InputContext โต้แย้ง

def dataset_fn(input_context):
  global_batch_size = 64
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)

  x = tf.random.uniform((10, 10))
  y = tf.random.uniform((10,))

  dataset = tf.data.Dataset.from_tensor_slices((x, y)).shuffle(10).repeat()
  dataset = dataset.shard(
      input_context.num_input_pipelines,
      input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2)

  return dataset

dc = tf.keras.utils.experimental.DatasetCreator(dataset_fn)

รหัสใน dataset_fn จะถูกเรียกบนอุปกรณ์ป้อนข้อมูลซึ่งมักจะ CPU, ในแต่ละเครื่องของผู้ปฏิบัติงาน

การสร้างแบบจำลองและการรวบรวม

ตอนนี้คุณจะสร้าง tf.keras.Model -a เล็กน้อย tf.keras.models.Sequential แบบจำลองสำหรับวัตถุประสงค์ตามการสาธิตโดย Model.compile เรียกร้องให้รวมส่วนประกอบเช่นเพิ่มประสิทธิภาพตัวชี้วัดหรือพารามิเตอร์เช่น steps_per_execution :

with strategy.scope():
  model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)])

model.compile(tf.keras.optimizers.SGD(), loss='mse', steps_per_execution=10)

โทรกลับและการฝึกอบรม

ก่อนที่จะสาย model.fit สำหรับการฝึกอบรมจริงขอให้เตรียมความพร้อมการเรียกกลับที่จำเป็นสำหรับงานทั่วไปเช่น:

  • ModelCheckpoint : เพื่อบันทึกน้ำหนักรุ่น
  • BackupAndRestore : เพื่อให้แน่ใจว่าความคืบหน้าการฝึกอบรมจะได้รับการสนับสนุนโดยอัตโนมัติและการกู้คืนถ้าประสบการณ์คลัสเตอร์ unavailability (เช่นการยกเลิกหรือใบจอง); หรือ
  • TensorBoard : การบันทึกรายงานความคืบหน้าเป็นไฟล์สรุปซึ่งได้รับการมองเห็นในเครื่องมือ TensorBoard
working_dir = '/tmp/my_working_dir'
log_dir = os.path.join(working_dir, 'log')
ckpt_filepath = os.path.join(working_dir, 'ckpt')
backup_dir = os.path.join(working_dir, 'backup')

callbacks = [
    tf.keras.callbacks.TensorBoard(log_dir=log_dir),
    tf.keras.callbacks.ModelCheckpoint(filepath=ckpt_filepath),
    tf.keras.callbacks.experimental.BackupAndRestore(backup_dir=backup_dir),
]

model.fit(dc, epochs=5, steps_per_epoch=20, callbacks=callbacks)
2021-07-22 01:22:30.205180: I tensorflow/core/profiler/lib/profiler_session.cc:131] Profiler session initializing.
2021-07-22 01:22:30.205213: I tensorflow/core/profiler/lib/profiler_session.cc:146] Profiler session started.
2021-07-22 01:22:30.207087: I tensorflow/core/profiler/lib/profiler_session.cc:164] Profiler session tear down.
Epoch 1/5
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
2021-07-22 01:22:34.281880: I tensorflow/core/profiler/lib/profiler_session.cc:131] Profiler session initializing.
2021-07-22 01:22:34.281923: I tensorflow/core/profiler/lib/profiler_session.cc:146] Profiler session started.
2021-07-22 01:22:34.290681: I tensorflow/core/profiler/lib/profiler_session.cc:66] Profiler session collecting data.
2021-07-22 01:22:34.291221: I tensorflow/core/profiler/lib/profiler_session.cc:164] Profiler session tear down.
2021-07-22 01:22:34.292249: I tensorflow/core/profiler/rpc/client/save_profile.cc:136] Creating directory: /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34

2021-07-22 01:22:34.292801: I tensorflow/core/profiler/rpc/client/save_profile.cc:142] Dumped gzipped tool data for trace.json.gz to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.trace.json.gz
2021-07-22 01:22:34.294605: I tensorflow/core/profiler/rpc/client/save_profile.cc:136] Creating directory: /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34

2021-07-22 01:22:34.294780: I tensorflow/core/profiler/rpc/client/save_profile.cc:142] Dumped gzipped tool data for memory_profile.json.gz to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.memory_profile.json.gz
2021-07-22 01:22:34.294930: I tensorflow/core/profiler/rpc/client/capture_profile.cc:251] Creating directory: /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34
Dumped tool data for xplane.pb to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.xplane.pb
Dumped tool data for overview_page.pb to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.overview_page.pb
Dumped tool data for input_pipeline.pb to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.input_pipeline.pb
Dumped tool data for tensorflow_stats.pb to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.tensorflow_stats.pb
Dumped tool data for kernel_stats.pb to /tmp/my_working_dir/log/plugins/profile/2021_07_22_01_22_34/kokoro-gcp-ubuntu-prod-1124739887.kernel_stats.pb

2021-07-22 01:22:34.380988: W tensorflow/core/framework/dataset.cc:679] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
20/20 - 4s - loss: 0.2856 - 4s/epoch - 201ms/step
2021-07-22 01:22:34.737150: W tensorflow/python/util/util.cc:348] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
2021-07-22 01:22:34.993072: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
2021-07-22 01:22:35.067372: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
Epoch 2/5
20/20 - 0s - loss: 0.3160 - 187ms/epoch - 9ms/step
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
Epoch 3/5
20/20 - 0s - loss: 0.2000 - 32ms/epoch - 2ms/step
2021-07-22 01:22:35.567146: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
2021-07-22 01:22:35.639496: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
WARNING:tensorflow:5 out of the last 5 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f6ce1aeb200> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
WARNING:tensorflow:6 out of the last 6 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f6cfc1e5560> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
Epoch 4/5
20/20 - 0s - loss: 0.2395 - 32ms/epoch - 2ms/step
2021-07-22 01:22:35.986756: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
2021-07-22 01:22:36.059412: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
Epoch 5/5
20/20 - 0s - loss: 0.1527 - 32ms/epoch - 2ms/step
2021-07-22 01:22:36.403661: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
2021-07-22 01:22:36.475197: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
2021-07-22 01:22:36.818981: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
2021-07-22 01:22:36.891188: I tensorflow/core/common_runtime/eager/kernel_and_device.cc:94] Ignoring error status when releasing multi-device function handle Unimplemented: Releasing a multi-device component handle on a remote device is not yet implemented.
<keras.callbacks.History at 0x7f6e7801fc50>

การใช้งานโดยตรงกับ ClusterCoordinator (อุปกรณ์เสริม)

แม้ว่าคุณจะเลือก Model.fit เส้นทางการฝึกอบรมคุณสามารถเลือกที่จะยกตัวอย่าง tf.distribute.experimental.coordinator.ClusterCoordinator วัตถุที่จะกำหนดเวลาฟังก์ชั่นอื่น ๆ ที่คุณต้องการที่จะดำเนินการในคนงาน ดู การฝึกอบรมที่มีการฝึกอบรมห่วงที่กำหนดเอง สำหรับรายละเอียดเพิ่มเติมและตัวอย่าง

การฝึกด้วยลูปการฝึกแบบกำหนดเอง

ใช้ลูปการฝึกอบรมที่กำหนดเองกับ tf.distribute.Strategy ให้มีความยืดหยุ่นที่ดีในการกำหนดลูปการฝึกอบรม ด้วย ParameterServerStrategy ที่กำหนดไว้ข้างต้น (ตาม strategy ) คุณจะใช้ tf.distribute.experimental.coordinator.ClusterCoordinator จะส่งดำเนินการตามขั้นตอนการฝึกอบรมให้กับแรงงานระยะไกล

จากนั้นคุณจะสร้างรูปแบบกำหนดชุดข้อมูลและฟังก์ชั่นขั้นตอนตามที่คุณได้ทำในวงการฝึกอบรมกับคนอื่น ๆ tf.distribute.Strategy s คุณสามารถค้นหารายละเอียดเพิ่มเติมใน การฝึกอบรมที่กำหนดเองกับ tf.distribute.Strategy กวดวิชา

เพื่อให้แน่ใจว่าการโหลดล่วงหน้าชุดข้อมูลที่มีประสิทธิภาพใช้แนะนำให้กระจาย APIs สร้างชุดข้อมูลที่กล่าวถึงใน ขั้นตอนการฝึกอบรมให้กับแรงงานส่งระยะไกล ส่วนด้านล่าง ยังให้แน่ใจว่าจะเรียก Strategy.run ภายใน worker_fn ที่จะใช้ประโยชน์อย่างเต็มที่จาก GPUs ที่จัดสรรให้กับคนงาน ขั้นตอนที่เหลือจะเหมือนกันสำหรับการฝึกแบบมีหรือไม่มี GPU

มาสร้างส่วนประกอบเหล่านี้ในขั้นตอนต่อไปนี้:

ตั้งค่าข้อมูล

ก่อนเขียนฟังก์ชั่นที่สร้างชุดข้อมูลที่มีตรรกะ preprocessing ดำเนินการโดยที่ Keras ชั้น preprocessing

คุณจะสร้างชั้นเหล่านี้นอก dataset_fn แต่ใช้การเปลี่ยนแปลงภายใน dataset_fn เนื่องจากคุณจะห่อ dataset_fn เป็น tf.function ซึ่งไม่อนุญาตให้ตัวแปรที่จะสร้างที่อยู่ภายในนั้น

feature_vocab = [
    "avenger", "ironman", "batman", "hulk", "spiderman", "kingkong", "wonder_woman"
]
label_vocab = ["yes", "no"]

with strategy.scope():
  feature_lookup_layer = preprocessing.StringLookup(
      vocabulary=feature_vocab,
      mask_token=None)
  label_lookup_layer = preprocessing.StringLookup(
      vocabulary=label_vocab,
      num_oov_indices=0,
      mask_token=None)

  raw_feature_input = tf.keras.layers.Input(
      shape=(3,),
      dtype=tf.string,
      name="feature")
  feature_id_input = feature_lookup_layer(raw_feature_input)
  feature_preprocess_stage = tf.keras.Model(
      {"features": raw_feature_input},
      feature_id_input)

  raw_label_input = tf.keras.layers.Input(
      shape=(1,),
      dtype=tf.string,
      name="label")
  label_id_input = label_lookup_layer(raw_label_input)

  label_preprocess_stage = tf.keras.Model(
      {"label": raw_label_input},
      label_id_input)

สร้างตัวอย่างของเล่นในชุดข้อมูล:

def feature_and_label_gen(num_examples=200):
  examples = {"features": [], "label": []}
  for _ in range(num_examples):
    features = random.sample(feature_vocab, 3)
    label = ["yes"] if "avenger" in features else ["no"]
    examples["features"].append(features)
    examples["label"].append(label)
  return examples

examples = feature_and_label_gen()

จากนั้นสร้างชุดการฝึกอบรมในห่อ dataset_fn :

def dataset_fn(_):
  raw_dataset = tf.data.Dataset.from_tensor_slices(examples)

  train_dataset = raw_dataset.map(
      lambda x: (
          {"features": feature_preprocess_stage(x["features"])},
          label_preprocess_stage(x["label"])
      )).shuffle(200).batch(32).repeat()
  return train_dataset

สร้างโมเดล

ถัดไป สร้างแบบจำลองและวัตถุอื่นๆ ให้แน่ใจว่าจะสร้างตัวแปรทั้งหมดภายใต้ strategy.scope

# These variables created under the `strategy.scope` will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
  # Create the model. The input needs to be compatible with Keras processing layers.
  model_input = tf.keras.layers.Input(
      shape=(3,), dtype=tf.int64, name="model_input")

  emb_layer = tf.keras.layers.Embedding(
      input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=16384)
  emb_output = tf.reduce_mean(emb_layer(model_input), axis=1)
  dense_output = tf.keras.layers.Dense(units=1, activation="sigmoid")(emb_output)
  model = tf.keras.Model({"features": model_input}, dense_output)

  optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.1)
  accuracy = tf.keras.metrics.Accuracy()

ยืนยันเถอะว่าการใช้ FixedShardsPartitioner แยกตัวแปรทั้งหมดเป็นสองเศษและแต่ละชิ้นส่วนที่ได้รับมอบหมายไปยังเซิร์ฟเวอร์ของพารามิเตอร์ที่แตกต่างกัน

assert len(emb_layer.weights) == 2
assert emb_layer.weights[0].shape == (4, 16384)
assert emb_layer.weights[1].shape == (4, 16384)
assert emb_layer.weights[0].device == "/job:ps/replica:0/task:0/device:CPU:0"
assert emb_layer.weights[1].device == "/job:ps/replica:0/task:1/device:CPU:0"

กำหนดขั้นตอนการฝึก

ประการที่สามการสร้างขั้นตอนการฝึกอบรมห่อเป็น tf.function :

@tf.function
def step_fn(iterator):

  def replica_fn(batch_data, labels):
    with tf.GradientTape() as tape:
      pred = model(batch_data, training=True)
      per_example_loss = tf.keras.losses.BinaryCrossentropy(
              reduction=tf.keras.losses.Reduction.NONE)(labels, pred)
      loss = tf.nn.compute_average_loss(per_example_loss)
      gradients = tape.gradient(loss, model.trainable_variables)

    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    accuracy.update_state(labels, actual_pred)
    return loss

  batch_data, labels = next(iterator)
  losses = strategy.run(replica_fn, args=(batch_data, labels))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)

ในฟังก์ชั่นขั้นตอนการฝึกอบรมดังกล่าวข้างต้นเรียก Strategy.run และ Strategy.reduce ใน step_fn สามารถรองรับ GPUs หลายต่อคนงาน หากคนงานได้ GPUs จัดสรร Strategy.run จะแจกจ่ายชุดข้อมูลในแบบจำลองหลาย ๆ

ส่งขั้นตอนการฝึกอบรมให้กับพนักงานทางไกล

หลังจากคำนวณทั้งหมดจะถูกกำหนดโดย ParameterServerStrategy คุณจะใช้ tf.distribute.experimental.coordinator.ClusterCoordinator ชั้นเรียนเพื่อสร้างทรัพยากรและแจกจ่ายขั้นตอนการฝึกอบรมเพื่อคนไกล

Let 's แรกสร้าง ClusterCoordinator วัตถุและผ่านในวัตถุกลยุทธ์ที่ใช้:

coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)

จากนั้น สร้างชุดข้อมูลต่อผู้ปฏิบัติงานและตัววนซ้ำ ใน per_worker_dataset_fn ด้านล่างตัด dataset_fn เข้า strategy.distribute_datasets_from_function จะแนะนำให้อนุญาตให้มีประสิทธิภาพในการโหลดล่วงหน้า GPUs ต่อเนื่อง

@tf.function
def per_worker_dataset_fn():
  return strategy.distribute_datasets_from_function(dataset_fn)

per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).

ขั้นตอนสุดท้ายคือการกระจายการคำนวณให้กับแรงงานระยะไกลโดยใช้ ClusterCoordinator.schedule :

  • schedule วิธี enqueues tf.function และผลตอบแทนในอนาคตเหมือน RemoteValue ทันที ฟังก์ชั่นการจัดคิวจะถูกส่งไปยังคนไกลในหัวข้อพื้นหลังและ RemoteValue จะเต็มไปถ่ายทอดสด
  • join วิธีการ ( ClusterCoordinator.join ) สามารถนำมาใช้จะต้องรอจนกว่าฟังก์ชั่นที่กำหนดไว้ทั้งหมดจะถูกดำเนินการ
num_epoches = 4
steps_per_epoch = 5
for i in range(num_epoches):
  accuracy.reset_states()
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  # Wait at epoch boundaries.
  coordinator.join()
  print ("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
Finished epoch 0, accuracy is 0.668750.
Finished epoch 1, accuracy is 0.450000.
Finished epoch 2, accuracy is 1.000000.
Finished epoch 3, accuracy is 1.000000.

นี่คือวิธีการที่คุณสามารถเรียกผลมาจากการที่ RemoteValue :

loss = coordinator.schedule(step_fn, args=(per_worker_iterator,))
print ("Final loss is %f" % loss.fetch())
Final loss is 0.000000

อีกวิธีหนึ่ง คุณสามารถเริ่มขั้นตอนทั้งหมดและทำบางสิ่งในขณะที่รอให้เสร็จสิ้นได้:

for _ in range(total_steps):
  coordinator.schedule(step_fn, args=(per_worker_iterator,))
while not coordinator.done():
  time.sleep(10)
  # Do something like logging metrics or writing checkpoints.

สำหรับการฝึกอบรมและการให้บริการที่สมบูรณ์เวิร์กโฟลว์สำหรับตัวอย่างนี้โดยเฉพาะอย่างยิ่งโปรดตรวจสอบนี้ การทดสอบ

ข้อมูลเพิ่มเติมเกี่ยวกับการสร้างชุดข้อมูล

ชุดข้อมูลในรหัสดังกล่าวข้างต้นจะถูกสร้างขึ้นโดยใช้ ClusterCoordinator.create_per_worker_dataset API) จะสร้างชุดข้อมูลหนึ่งชุดต่อผู้ปฏิบัติงานและส่งคืนออบเจ็กต์คอนเทนเนอร์ คุณสามารถเรียก iter วิธีการที่มันจะสร้าง iterator ต่องาน ต่องาน iterator มีหนึ่ง iterator ต่อคนงานและชิ้นที่สอดคล้องกันของผู้ปฏิบัติงานจะได้รับการทดแทนในอาร์กิวเมนต์ใส่ของฟังก์ชั่นส่งผ่านไปยัง ClusterCoordinator.schedule วิธีการก่อนหน้าที่จะถูกดำเนินการเกี่ยวกับคนงานโดยเฉพาะอย่างยิ่ง

ปัจจุบัน ClusterCoordinator.schedule วิธีอนุมานแรงงานเทียบเท่าจึงถือว่าชุดข้อมูลแรงงานที่แตกต่างกันเหมือนเดิมยกเว้นพวกเขาอาจจะสับแตกต่างกันถ้าพวกเขามี Dataset.shuffle การดำเนินงาน ด้วยเหตุนี้มันเป็นยังแนะนำว่าชุดข้อมูลที่จะต้องทำซ้ำไปเรื่อย ๆ และคุณกำหนดเวลาจำนวน จำกัด ของขั้นตอนแทนการพึ่งพา OutOfRangeError จากชุดข้อมูล

อีกข้อสำคัญก็คือว่า tf.data ชุดข้อมูลไม่สนับสนุนอนุกรมโดยปริยายและ deserialization ข้ามขอบเขตของงาน จึงเป็นสิ่งสำคัญในการสร้างชุดข้อมูลทั้งภายในฟังก์ชันส่งผ่านไปยัง ClusterCoordinator.create_per_worker_dataset

การประเมิน

มีมากกว่าหนึ่งวิธีในการกำหนดและรันลูปการประเมินในการฝึกอบรมแบบกระจาย แต่ละคนมีข้อดีและข้อเสียของตัวเองตามที่อธิบายไว้ด้านล่าง แนะนำให้ใช้วิธีการประเมินแบบอินไลน์หากคุณไม่ต้องการ

การประเมินแบบอินไลน์

ในวิธีการนี้จะสลับระหว่างผู้ประสานงานการฝึกอบรมและการประเมินผลและดังนั้นจึงเรียกมันว่าการประเมินผลแบบอินไลน์

มีประโยชน์หลายประการของการประเมินแบบอินไลน์ ตัวอย่างเช่น:

  • สามารถรองรับแบบจำลองการประเมินขนาดใหญ่และชุดข้อมูลการประเมินที่งานเดียวไม่สามารถถือครองได้
  • ผลการประเมินสามารถใช้ในการตัดสินใจสำหรับการฝึกอบรมในยุคต่อไปได้

มีสองวิธีในการดำเนินการประเมินแบบอินไลน์: การประเมินโดยตรงและการประเมินแบบกระจาย

  • การประเมินผลโดยตรง: สำหรับรุ่นขนาดเล็กและชุดข้อมูลการประเมินผลการประสานงานสามารถเรียกใช้การประเมินผลโดยตรงในรูปแบบการกระจายกับชุดข้อมูลที่เกี่ยวกับการประเมินผลผู้ประสานงาน:
eval_dataset = tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).batch(8)

eval_accuracy = tf.keras.metrics.Accuracy()

for batch_data, labels in eval_dataset:
  pred = model(batch_data, training=False)
  actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
  eval_accuracy.update_state(labels, actual_pred)

print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000
  • การประเมินผลที่กระจายได้: สำหรับรุ่นขนาดใหญ่หรือชุดข้อมูลที่มีความเป็นไปไม่ได้ที่จะเรียกใช้โดยตรงในการประสานงานงานผู้ประสานงานสามารถกระจายงานการประเมินผลให้กับแรงงานที่ผ่าน ClusterCoordinator.schedule / ClusterCoordinator.join วิธีการ:
with strategy.scope():
  # Define the eval metric on parameter servers.
  eval_accuracy = tf.keras.metrics.Accuracy()

@tf.function
def eval_step(iterator):
  def replica_fn(batch_data, labels):
    pred = model(batch_data, training=False)
    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    eval_accuracy.update_state(labels, actual_pred)
  batch_data, labels = next(iterator)
  strategy.run(replica_fn, args=(batch_data, labels))

def eval_dataset_fn():
  return tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).shuffle(16).repeat().batch(8)

per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn)
per_worker_eval_iterator = iter(per_worker_eval_dataset)

eval_steps_per_epoch = 2
for _ in range(eval_steps_per_epoch):
  coordinator.schedule(eval_step, args=(per_worker_eval_iterator,))
coordinator.join()
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000

การประเมินด้านข้างรถ

อีกวิธีหนึ่งที่เรียกว่าการประเมินผลด้านรถที่คุณสร้างงานประเมินเฉพาะที่ซ้ำ ๆ อ่านจุดตรวจและวิ่งประเมินด่านใหม่ล่าสุด จะช่วยให้โปรแกรมการฝึกอบรมของคุณเสร็จสิ้นก่อนเวลาอันควร หากคุณไม่ต้องการเปลี่ยนรอบการฝึกตามผลการประเมิน อย่างไรก็ตาม จำเป็นต้องมีงานผู้ประเมินเพิ่มเติมและจุดตรวจเป็นระยะเพื่อกระตุ้นการประเมิน ต่อไปนี้เป็นวงจรการประเมินด้านข้างรถที่เป็นไปได้:

checkpoint_dir = ...
eval_model = ...
eval_data = ...
checkpoint = tf.train.Checkpoint(model=eval_model)

for latest_checkpoint in tf.train.checkpoints_iterator(
    checkpoint_dir):
  try:
    checkpoint.restore(latest_checkpoint).expect_partial()
  except (tf.errors.OpError,) as e:
    # checkpoint may be deleted by training when it is about to read it.
    continue

  # Optionally add callbacks to write summaries.
  eval_model.evaluate(eval_data)

  # Evaluation finishes when it has evaluated the last epoch.
  if latest_checkpoint.endswith('-{}'.format(train_epoches)):
    break

กลุ่มในโลกแห่งความเป็นจริง

ในสภาพแวดล้อมการผลิตจริง คุณจะรันงานทั้งหมดในกระบวนการต่าง ๆ บนเครื่องที่แตกต่างกัน วิธีที่ง่ายที่สุดในการกำหนดค่าข้อมูลคลัสเตอร์ในแต่ละงานคือการตั้ง "TF_CONFIG" ตัวแปรสภาพแวดล้อมและใช้ tf.distribute.cluster_resolver.TFConfigClusterResolver ที่จะแยก "TF_CONFIG"

สำหรับคำอธิบายทั่วไปเกี่ยวกับ "TF_CONFIG" ตัวแปรสภาพแวดล้อมหมายถึง การฝึกอบรมการแจกจ่าย คู่มือ

หากคุณเริ่มต้นงานการฝึกอบรมของคุณโดยใช้ Kubernetes หรือแม่แบบการตั้งค่าอื่น ๆ ก็มีโอกาสมากที่แม่แบบเหล่านี้ได้กำหนดไว้แล้ว “TF_CONFIG" สำหรับคุณ

การตั้งค่า "TF_CONFIG" ตัวแปรสภาพแวดล้อม

สมมติว่าคุณมี 3 คนและ 2 เซิร์ฟเวอร์พารามิเตอร์ที่ "TF_CONFIG" ของคนงาน 1 สามารถ:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"],
        "chief": ["host6:port"]
    },
    "task": {"type": "worker", "index": 1}
})

"TF_CONFIG" ของผู้ประเมินสามารถ:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "evaluator": ["host7:port"]
    },
    "task": {"type": "evaluator", "index": 0}
})

"cluster" ส่วนหนึ่งในข้างต้น "TF_CONFIG" สตริงสำหรับผู้ประเมินเป็นตัวเลือก

หากคุณใช้เลขฐานสองเดียวกันสำหรับงานทั้งหมด

หากคุณต้องการรันงานเหล่านี้ทั้งหมดโดยใช้ไบนารีเดียว คุณจะต้องปล่อยให้โปรแกรมของคุณแยกสาขาออกเป็นบทบาทต่างๆ ในตอนเริ่มต้น:

cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if cluster_resolver.task_type in ("worker", "ps"):
  # Start a TensorFlow server and wait.
elif cluster_resolver.task_type == "evaluator":
  # Run side-car evaluation
else:
  # Run the coordinator.

รหัสต่อไปนี้เริ่มต้นเซิร์ฟเวอร์ TensorFlow และรอ:

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

server = tf.distribute.Server(
    cluster_resolver.cluster_spec(),
    job_name=cluster_resolver.task_type,
    task_index=cluster_resolver.task_id,
    protocol=cluster_resolver.rpc_layer or "grpc",
    start=True)
server.join()

การจัดการความล้มเหลวของงาน

ความล้มเหลวของผู้ปฏิบัติงาน

tf.distribute.experimental.coordinator.ClusterCoordinator หรือ Model.fit ให้ในตัวความอดทนความผิดสำหรับความล้มเหลวของผู้ปฏิบัติงาน เมื่อการกู้คืนงานฟังก์ชั่นให้ไว้ก่อนหน้าชุด (อย่างใดอย่างหนึ่งเพื่อ ClusterCoordinator.create_per_worker_dataset สำหรับห่วงการฝึกอบรมที่กำหนดเองหรือ tf.keras.utils.experimental.DatasetCreator สำหรับ Model.fit ) จะถูกเรียกคนงานที่จะสร้างใหม่อีกครั้งชุดข้อมูล

เซิร์ฟเวอร์พารามิเตอร์หรือความล้มเหลวของผู้ประสานงาน

แต่เมื่อผู้ประสานงานเห็นข้อผิดพลาดเซิร์ฟเวอร์พารามิเตอร์ก็จะยก UnavailableError หรือ AbortedError ทันที คุณสามารถรีสตาร์ทผู้ประสานงานได้ในกรณีนี้ ผู้ประสานงานเองก็อาจใช้งานไม่ได้เช่นกัน ดังนั้นจึงแนะนำให้ใช้เครื่องมือบางอย่างเพื่อไม่ให้สูญเสียความคืบหน้าในการฝึกอบรม:

  • สำหรับ Model.fit คุณควรใช้ BackupAndRestore โทรกลับซึ่งจัดการประหยัดความคืบหน้าและการฟื้นฟูโดยอัตโนมัติ ดู Callbacks และการฝึกอบรม ด้านบนสำหรับตัวอย่าง

  • สำหรับลูปการฝึกแบบกำหนดเอง คุณควรตรวจสอบตัวแปรโมเดลเป็นระยะๆ และโหลดตัวแปรโมเดลจากจุดตรวจสอบ หากมี ก่อนที่การฝึกจะเริ่มต้น ความคืบหน้าการฝึกอบรมจะสามารถสรุปโดยประมาณจาก optimizer.iterations ถ้า Optimizer เป็น checkpointed:

checkpoint_manager = tf.train.CheckpointManager(
    tf.train.Checkpoint(model=model, optimizer=optimizer),
    checkpoint_dir,
    max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
  checkpoint = checkpoint_manager.checkpoint
  checkpoint.restore(
      checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()

global_steps = int(optimizer.iterations.numpy())
starting_epoch = global_steps // steps_per_epoch

for _ in range(starting_epoch, num_epoches):
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  coordinator.join()
  checkpoint_manager.save()

กำลังคาบ RemoteValue

กำลังคาบ RemoteValue รับประกันได้ว่าจะประสบความสำเร็จถ้าฟังก์ชั่นจะถูกดำเนินการประสบความสำเร็จ เนื่องจากขณะนี้ ค่าส่งคืนจะถูกคัดลอกไปยังผู้ประสานงานทันทีหลังจากดำเนินการฟังก์ชัน หากมีความล้มเหลวของผู้ปฏิบัติงานในระหว่างการคัดลอก ฟังก์ชันนี้จะถูกลองอีกครั้งกับผู้ปฏิบัติงานอื่นที่มีอยู่ ดังนั้น ถ้าคุณต้องการปรับให้เหมาะสมสำหรับประสิทธิภาพ คุณสามารถกำหนดเวลาฟังก์ชันโดยไม่มีค่าส่งคืนได้

การรายงานข้อผิดพลาด

เมื่อประสานงานเห็นข้อผิดพลาดเช่น UnavailableError จากเซิร์ฟเวอร์พารามิเตอร์หรือข้อผิดพลาดโปรแกรมอื่น ๆ เช่น InvalidArgument จาก tf.debugging.check_numerics ก็จะยกเลิกฟังก์ชั่นที่ค้างอยู่และจัดคิวทั้งหมดก่อนที่จะเพิ่มข้อผิดพลาด การดึงข้อมูลที่สอดคล้องกันของพวกเขา RemoteValue s จะยก CancelledError

หลังจากเกิดข้อผิดพลาด ผู้ประสานงานจะไม่แจ้งข้อผิดพลาดเดิมหรือข้อผิดพลาดใดๆ จากฟังก์ชันที่ถูกยกเลิก

การปรับปรุงประสิทธิภาพ

มีเหตุผลที่เป็นไปได้หลายถ้าคุณเห็นปัญหาประสิทธิภาพการทำงานเมื่อคุณฝึกกับ ParameterServerStrategy และ ClusterResolver

สาเหตุทั่วไปประการหนึ่งคือเซิร์ฟเวอร์พารามิเตอร์มีโหลดที่ไม่สมดุลและเซิร์ฟเวอร์พารามิเตอร์ที่โหลดหนักบางตัวมีความจุถึงขีดจำกัดแล้ว นอกจากนี้ยังสามารถมีได้หลายสาเหตุ วิธีง่ายๆ ในการแก้ไขปัญหานี้คือ:

  1. Shard ตัวแปรแบบจำลองขนาดใหญ่ของคุณผ่านการระบุ variable_partitioner เมื่อสร้าง ParameterServerStrategy
  2. หลีกเลี่ยงการสร้างตัวแปรฮอตสปอตที่เซิร์ฟเวอร์พารามิเตอร์ทั้งหมดต้องการในขั้นตอนเดียว ถ้าเป็นไปได้ ตัวอย่างเช่นใช้อัตราการเรียนรู้อย่างต่อเนื่องหรือ subclass tf.keras.optimizers.schedules.LearningRateSchedule ในการเพิ่มประสิทธิภาพการทำงานเริ่มต้นตั้งแต่เป็นว่าอัตราการเรียนรู้ที่จะกลายเป็นตัวแปรที่วางอยู่บนเซิร์ฟเวอร์พารามิเตอร์โดยเฉพาะและการร้องขอจากเซิร์ฟเวอร์พารามิเตอร์อื่น ๆ ทั้งหมดในแต่ละขั้นตอน .
  3. สับเปลี่ยนคำศัพท์ขนาดใหญ่ของคุณก่อนที่จะส่งต่อไปยังเลเยอร์การประมวลผลล่วงหน้าของ Keras

อีกสาเหตุที่เป็นไปได้สำหรับปัญหาด้านประสิทธิภาพคือผู้ประสานงาน การดำเนินการครั้งแรกของ schedule / join เป็นงูหลาม-based และอาจมีค่าใช้จ่ายเกลียว นอกจากนี้ เวลาแฝงระหว่างผู้ประสานงานและพนักงานอาจมีขนาดใหญ่ หากเป็นกรณีนี้

  • สำหรับ Model.fit คุณสามารถตั้งค่า steps_per_execution อาร์กิวเมนต์ที่มีให้ ณ Model.compile ให้เป็นค่าที่มีขนาดใหญ่กว่า 1

  • สำหรับห่วงการฝึกอบรมที่กำหนดเองคุณสามารถแพ็คหลายขั้นตอนเป็นหนึ่ง tf.function :

steps_per_invocation = 10

@tf.function
def step_fn(iterator):
  for _ in range(steps_per_invocation):
    features, labels = next(iterator)
    def replica_fn(features, labels):
      ...

    strategy.run(replica_fn, args=(features, labels))

เนื่องจากไลบรารีได้รับการปรับให้เหมาะสมยิ่งขึ้น หวังว่าผู้ใช้ส่วนใหญ่จะไม่ต้องแพ็คขั้นตอนด้วยตนเองอีกในอนาคต

นอกจากนี้ เคล็ดลับเล็กๆ น้อยๆ สำหรับการปรับปรุงประสิทธิภาพคือการจัดกำหนดการฟังก์ชันโดยไม่มีค่าส่งคืนตามที่อธิบายไว้ในส่วนการจัดการความล้มเหลวของงานด้านบน

ข้อจำกัดที่ทราบ

ข้อจำกัดที่ทราบส่วนใหญ่มีอยู่แล้วในหัวข้อข้างต้น ส่วนนี้ให้ข้อมูลสรุป

ParameterServerStrategy ทั่วไป

  • os.environment["grpc_fail_fast"]="use_caller" เป็นสิ่งจำเป็นในงานทุกคนรวมทั้งการประสานงานเพื่อให้ทำงานได้อย่างถูกต้องยอมรับความผิด
  • ไม่รองรับการฝึกอบรมเซิร์ฟเวอร์พารามิเตอร์แบบซิงโครนัส
  • โดยปกติจำเป็นต้องรวมหลายขั้นตอนไว้ในฟังก์ชันเดียวเพื่อให้ได้ประสิทธิภาพสูงสุด
  • มันไม่ได้รับการสนับสนุนในการโหลด saved_model ผ่าน tf.saved_model.load มีตัวแปร sharded หมายเหตุการโหลด save_model โดยใช้ TensorFlow Serving นั้นคาดว่าจะใช้งานได้
  • ไม่รองรับการโหลดจุดตรวจสอบที่มีตัวแปรช่องเครื่องมือเพิ่มประสิทธิภาพที่แบ่งส่วนข้อมูลลงในส่วนแบ่งข้อมูลจำนวนอื่น
  • ไม่สนับสนุนการกู้คืนจากความล้มเหลวของเซิร์ฟเวอร์พารามิเตอร์โดยไม่ต้องรีสตาร์ทงานผู้ประสานงาน
  • การใช้งานของ tf.lookup.StaticHashTable (ซึ่งมักจะถูกใช้โดยบาง tf.keras.layers.experimental.preprocessing ชั้นเช่น IntegerLookup , StringLookup และ TextVectorization ) ส่งผลให้ทรัพยากรที่วางอยู่บนประสานงานในครั้งนี้ด้วยการฝึกอบรมเซิร์ฟเวอร์พารามิเตอร์ สิ่งนี้มีผลกระทบด้านประสิทธิภาพสำหรับการค้นหา RPC จากผู้ปฏิบัติงานไปยังผู้ประสานงาน นี่คือลำดับความสำคัญสูงในการแก้ไขในปัจจุบัน

Model.fit เฉพาะ

  • steps_per_epoch อาร์กิวเมนต์เป็นสิ่งจำเป็นใน Model.fit คุณสามารถเลือกค่าที่ให้ช่วงเวลาที่เหมาะสมในยุค
  • ParameterServerStrategy ไม่ได้มีการสนับสนุนสำหรับการเรียกกลับที่กำหนดเองที่มีสายชุดระดับสำหรับเหตุผลประสิทธิภาพ คุณควรแปลงสายที่โทรเข้ามาในยุคในระดับเดียวกับหยิบเหมาะสม steps_per_epoch เพื่อที่พวกเขาจะเรียกว่าทุก steps_per_epoch จำนวนของขั้นตอน การเรียกกลับในตัวจะไม่ได้รับผลกระทบ: การเรียกระดับแบทช์ของพวกเขาได้รับการแก้ไขเพื่อให้มีประสิทธิภาพ สนับสนุนชุดสายระดับ ParameterServerStrategy จะถูกวางแผน
  • ด้วยเหตุผลเดียวกัน แถบความคืบหน้าและเมตริกต่างจากกลยุทธ์อื่นๆ ที่บันทึกเฉพาะที่ขอบเขตของยุคเท่านั้น
  • run_eagerly ไม่สนับสนุน

ข้อมูลเฉพาะของลูปการฝึกแบบกำหนดเอง