การทำงานกับ ClientData ของ tff

ดูบน TensorFlow.org ทำงานใน Google Colab ดูแหล่งที่มาบน GitHub ดาวน์โหลดโน๊ตบุ๊ค

แนวคิดของชุดข้อมูลที่คีย์โดยไคลเอ็นต์ (เช่น ผู้ใช้) มีความสำคัญต่อการคำนวณแบบรวมศูนย์ตามแบบจำลองใน TFF ฉิบหายให้ติดต่อ tff.simulation.datasets.ClientData ที่เป็นนามธรรมมากกว่าแนวคิดนี้และชุดข้อมูลที่โฮสต์ฉิบหาย ( StackOverflow , เช็คสเปียร์ , emnist , cifar100 และ gldv2 ) ทั้งหมดใช้อินเตอร์เฟซนี้

ถ้าคุณกำลังทำงานกับการเรียนรู้แบบ federated กับชุดของคุณเองฉิบหายขอสนับสนุนให้คุณสามารถดำเนินการอย่างใดอย่างหนึ่ง ClientData อินเตอร์เฟซหรือการใช้หนึ่งฉิบหายของฟังก์ชั่นผู้ช่วยในการสร้าง ClientData ซึ่งหมายถึงข้อมูลของคุณบนดิสก์เช่น tff.simulation.datasets.ClientData.from_clients_and_fn .

เป็นที่สุดของฉิบหายของตัวอย่างแบบ end-to-end ที่เริ่มต้นด้วย ClientData วัตถุ, การดำเนินการ ClientData อินเตอร์เฟซที่มีชุดข้อมูลที่กำหนดเองของคุณจะทำให้มันง่ายต่อการ spelunk ผ่านรหัสที่มีอยู่เขียนด้วยฉิบหาย นอกจากนี้ tf.data.Datasets ซึ่ง ClientData โครงสร้างสามารถซ้ำมากกว่าโดยตรงเพื่อให้โครงสร้างของ numpy อาร์เรย์ดังนั้น ClientData วัตถุสามารถนำมาใช้กับกรอบ ML หลามตามก่อนที่จะย้ายไปฉิบหาย

มีหลายรูปแบบที่คุณสามารถทำให้ชีวิตของคุณง่ายขึ้นหากคุณต้องการขยายการจำลองของคุณไปยังหลายเครื่องหรือปรับใช้ ด้านล่างเราจะเดินผ่านไม่กี่วิธีที่เราสามารถใช้ ClientData และฉิบหายที่จะทำให้ขนาดเล็กย้ำการขนาดใหญ่การทดลองการผลิตประสบการณ์การใช้งานของเราเป็นไปอย่างราบรื่นที่สุดเท่าที่ทำได้

ฉันควรใช้รูปแบบใดในการส่งผ่าน ClientData ไปยัง TFF

เราจะหารือสองประเพณีของฉิบหายของ ClientData ในเชิงลึก; หากคุณเหมาะสมกับประเภทใดประเภทหนึ่งจากสองหมวดหมู่ด้านล่างนี้ คุณจะเลือกประเภทใดประเภทหนึ่งมากกว่าประเภทอื่นอย่างชัดเจน หากไม่เป็นเช่นนั้น คุณอาจต้องทำความเข้าใจรายละเอียดข้อดีข้อเสียของแต่ละรายการให้ละเอียดยิ่งขึ้นเพื่อตัดสินใจเลือกให้เหมาะสมยิ่งขึ้น

  • ฉันต้องการวนซ้ำโดยเร็วที่สุดบนเครื่องท้องถิ่น ฉันไม่จำเป็นต้องใช้ประโยชน์จากรันไทม์แบบกระจายของ TFF ได้ง่ายๆ

    • คุณต้องการที่จะผ่าน tf.data.Datasets ในฉิบหายโดยตรง
    • นี้จะช่วยให้คุณสามารถตั้งโปรแกรม imperatively กับ tf.data.Dataset วัตถุและดำเนินการให้โดยพลการ
    • ให้ความยืดหยุ่นมากกว่าตัวเลือกด้านล่าง การผลักตรรกะไปยังไคลเอนต์ต้องการให้ตรรกะนี้เป็นอนุกรมได้
  • ฉันต้องการเรียกใช้การคำนวณแบบรวมศูนย์ในรันไทม์ระยะไกลของ TFF หรือฉันวางแผนที่จะดำเนินการในเร็วๆ นี้

    • ในกรณีนี้ คุณต้องการแมปการสร้างชุดข้อมูลและการประมวลผลล่วงหน้ากับไคลเอ็นต์
    • ผลนี้ในตัวคุณผ่านเพียงรายการ client_ids โดยตรงกับการคำนวณรวมของคุณ
    • การผลักดันการสร้างชุดข้อมูลและการประมวลผลล่วงหน้าไปยังไคลเอนต์จะช่วยหลีกเลี่ยงปัญหาคอขวดในการทำให้เป็นอนุกรม และเพิ่มประสิทธิภาพอย่างมากกับลูกค้าหลายแสนราย

ตั้งค่าสภาพแวดล้อมโอเพ่นซอร์ส

นำเข้าแพ็คเกจ

การจัดการวัตถุ ClientData

ขอเริ่มต้นด้วยการโหลดและการสำรวจฉิบหายของ EMNIST ClientData :

client_data, _ = tff.simulation.datasets.emnist.load_data()
Downloading emnist_all.sqlite.lzma: 100%|██████████| 170507172/170507172 [00:19<00:00, 8831921.67it/s]
2021-10-01 11:17:58.718735: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected

ตรวจสอบชุดแรกที่สามารถบอกเราว่าสิ่งที่ประเภทของตัวอย่างอยู่ใน ClientData

first_client_id = client_data.client_ids[0]
first_client_dataset = client_data.create_tf_dataset_for_client(
    first_client_id)
print(first_client_dataset.element_spec)
# This information is also available as a `ClientData` property:
assert client_data.element_type_structure == first_client_dataset.element_spec
OrderedDict([('label', TensorSpec(shape=(), dtype=tf.int32, name=None)), ('pixels', TensorSpec(shape=(28, 28), dtype=tf.float32, name=None))])

โปรดทราบว่าชุดข้อมูลที่อัตราผลตอบแทน collections.OrderedDict วัตถุที่มี pixels และ label กุญแจที่พิกเซลเป็นเมตริกซ์ที่มีรูปร่าง [28, 28] สมมติว่าเราต้องการที่จะแผ่ปัจจัยการผลิตของเราออกมาให้มีรูปทรง [784] วิธีการหนึ่งที่เป็นไปได้ที่เราจะสามารถทำเช่นนี้จะนำไปใช้ฟังก์ชั่นการประมวลผลก่อนที่จะเรา ClientData วัตถุ

def preprocess_dataset(dataset):
  """Create batches of 5 examples, and limit to 3 batches."""

  def map_fn(input):
    return collections.OrderedDict(
        x=tf.reshape(input['pixels'], shape=(-1, 784)),
        y=tf.cast(tf.reshape(input['label'], shape=(-1, 1)), tf.int64),
    )

  return dataset.batch(5).map(
      map_fn, num_parallel_calls=tf.data.experimental.AUTOTUNE).take(5)


preprocessed_client_data = client_data.preprocess(preprocess_dataset)

# Notice that we have both reshaped and renamed the elements of the ordered dict.
first_client_dataset = preprocessed_client_data.create_tf_dataset_for_client(
    first_client_id)
print(first_client_dataset.element_spec)
OrderedDict([('x', TensorSpec(shape=(None, 784), dtype=tf.float32, name=None)), ('y', TensorSpec(shape=(None, 1), dtype=tf.int64, name=None))])

เราอาจต้องการดำเนินการล่วงหน้าที่ซับซ้อนมากขึ้น (และอาจเป็นการเก็บสถานะ) เพิ่มเติม เช่น การสับเปลี่ยน

def preprocess_and_shuffle(dataset):
  """Applies `preprocess_dataset` above and shuffles the result."""
  preprocessed = preprocess_dataset(dataset)
  return preprocessed.shuffle(buffer_size=5)

preprocessed_and_shuffled = client_data.preprocess(preprocess_and_shuffle)

# The type signature will remain the same, but the batches will be shuffled.
first_client_dataset = preprocessed_and_shuffled.create_tf_dataset_for_client(
    first_client_id)
print(first_client_dataset.element_spec)
OrderedDict([('x', TensorSpec(shape=(None, 784), dtype=tf.float32, name=None)), ('y', TensorSpec(shape=(None, 1), dtype=tf.int64, name=None))])

เชื่อมต่อกับ tff.Computation

ตอนนี้ที่เราสามารถดำเนินกิจวัตรพื้นฐานบางอย่างกับ ClientData วัตถุเราพร้อมที่จะข้อมูลฟีดกับ tff.Computation เรากำหนด tff.templates.IterativeProcess ซึ่งดำเนิน สหพันธ์ Averaging และสำรวจวิธีการที่แตกต่างกันของการส่งผ่านข้อมูลที่มัน

def model_fn():
  model = tf.keras.models.Sequential([
      tf.keras.layers.InputLayer(input_shape=(784,)),
      tf.keras.layers.Dense(10, kernel_initializer='zeros'),
  ])
  return tff.learning.from_keras_model(
      model,
      # Note: input spec is the _batched_ shape, and includes the 
      # label tensor which will be passed to the loss function. This model is
      # therefore configured to accept data _after_ it has been preprocessed.
      input_spec=collections.OrderedDict(
          x=tf.TensorSpec(shape=[None, 784], dtype=tf.float32),
          y=tf.TensorSpec(shape=[None, 1], dtype=tf.int64)),
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])

trainer = tff.learning.build_federated_averaging_process(
    model_fn,
    client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.01))

ก่อนที่เราจะเริ่มต้นการทำงานกับเรื่องนี้ IterativeProcess หนึ่งความคิดเห็นเกี่ยวกับความหมายของ ClientData อยู่ในลำดับ ClientData วัตถุแสดงให้เห็นถึงความสมบูรณ์ของประชากรที่มีอยู่สำหรับการฝึกอบรมแบบ federated ซึ่งโดยทั่วไปจะ ไม่สามารถใช้ได้กับสภาพแวดล้อมการดำเนินการของการผลิตระบบฟลอริด้า และเป็นเฉพาะกับการจำลอง ClientData แน่นอนช่วยให้ผู้ใช้สามารถที่จะข้ามคอมพิวเตอร์ federated สิ้นเชิงและก็ฝึกรูปแบบที่ฝั่งเซิร์ฟเวอร์ตามปกติผ่าน ClientData.create_tf_dataset_from_all_clients

สภาพแวดล้อมการจำลองของ TFF ทำให้ผู้วิจัยสามารถควบคุมวงรอบนอกได้อย่างสมบูรณ์ โดยเฉพาะอย่างยิ่ง นี่แสดงถึงการพิจารณาความพร้อมใช้งานของไคลเอ็นต์ การออกจากระบบของไคลเอ็นต์ ฯลฯ จะต้องได้รับการแก้ไขโดยผู้ใช้หรือสคริปต์ไดรเวอร์ Python หนึ่งสามารถสำหรับการออกกลางคันของลูกค้าตัวอย่างเช่นรูปแบบโดยการปรับการกระจายการสุ่มตัวอย่างของคุณมากกว่า ClientData's client_ids ดังกล่าวว่าผู้ใช้ที่มีข้อมูลมากขึ้น (และตามลําดับอีกต่อไปทำงานคำนวณท้องถิ่น) จะได้รับการคัดเลือกด้วยความน่าจะต่ำกว่า

อย่างไรก็ตาม ในระบบรวมจริง ไคลเอ็นต์ไม่สามารถเลือกโดยผู้ฝึกสอนแบบจำลองได้อย่างชัดเจน การเลือกไคลเอนต์จะมอบหมายให้กับระบบที่กำลังดำเนินการคำนวณแบบรวมศูนย์

ผ่าน tf.data.Datasets โดยตรงกับฉิบหาย

ทางเลือกหนึ่งที่เรามีสำหรับการเชื่อมต่อระหว่าง ClientData และ IterativeProcess คือการสร้าง tf.data.Datasets ในหลามและผ่านชุดข้อมูลเหล่านี้เพื่อฉิบหาย

ขอให้สังเกตว่าถ้าเราใช้ preprocessed เรา ClientData ชุดข้อมูลที่เราให้ผลผลิตเป็นประเภทที่เหมาะสมโดยคาดว่ารูปแบบของเราที่กำหนดไว้ข้างต้น

selected_client_ids = preprocessed_and_shuffled.client_ids[:10]

preprocessed_data_for_clients = [
    preprocessed_and_shuffled.create_tf_dataset_for_client(
        selected_client_ids[i]) for i in range(10)
]

state = trainer.initialize()
for _ in range(5):
  t1 = time.time()
  state, metrics = trainer.next(state, preprocessed_data_for_clients)
  t2 = time.time()
  print('loss {}, round time {}'.format(metrics['train']['loss'], t2 - t1))
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_federated/python/core/impl/compiler/tensorflow_computation_transformations.py:62: extract_sub_graph (from tensorflow.python.framework.graph_util_impl) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.compat.v1.graph_util.extract_sub_graph`
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_federated/python/core/impl/compiler/tensorflow_computation_transformations.py:62: extract_sub_graph (from tensorflow.python.framework.graph_util_impl) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.compat.v1.graph_util.extract_sub_graph`
loss 2.9005744457244873, round time 4.576513767242432
loss 3.113278388977051, round time 0.49641919136047363
loss 2.7581865787506104, round time 0.4904160499572754
loss 2.87259578704834, round time 0.48976993560791016
loss 3.1202380657196045, round time 0.6724586486816406

ถ้าเราใช้เส้นทางนี้ แต่เราจะไม่สามารถที่จะย้ายไปนิดจำลอง multimachine ชุดข้อมูลที่เราสร้างในรันไทม์ TensorFlow ท้องถิ่นสามารถจับภาพรัฐจากสภาพแวดล้อมหลามโดยรอบและล้มเหลวในการอนุกรมหรือ deserialization เมื่อพวกเขาพยายามที่จะอ้างอิงรัฐซึ่งไม่สามารถใช้ได้กับพวกเขา นี้สามารถประจักษ์เช่นในข้อผิดพลาดลึกลับจาก TensorFlow ของ tensor_util.cc :

Check failed: DT_VARIANT == input.dtype() (21 vs. 20)

การสร้างแผนที่และการประมวลผลล่วงหน้าเหนือลูกค้า

เพื่อหลีกเลี่ยงปัญหานี้ฉิบหายแนะนำให้ผู้ใช้ในการพิจารณา instantiation ชุดและ preprocessing เป็นสิ่งที่เกิดขึ้นในประเทศที่ลูกค้าแต่ละคนและจะใช้ผู้ช่วยเหลือฉิบหายหรือ federated_map ใช้รหัส preprocessing นี้อย่างชัดเจนที่ลูกค้าแต่ละราย

ตามแนวคิดแล้ว เหตุผลในการเลือกสิ่งนี้ชัดเจน: ในรันไทม์ท้องถิ่นของ TFF ลูกค้า "โดยบังเอิญ" เท่านั้นที่สามารถเข้าถึงสภาพแวดล้อม Python ทั่วโลกได้ เนื่องจากข้อเท็จจริงที่ว่าการประสานรวมทั้งหมดเกิดขึ้นในเครื่องเดียว เป็นที่น่าสังเกตว่า ณ จุดนี้ความคิดที่คล้ายคลึงกันก่อให้เกิดปรัชญาการทำงานข้ามแพลตฟอร์มของ TFF ที่เรียงลำดับได้เสมอและใช้งานได้

ฉิบหายทำให้การเปลี่ยนแปลงดังกล่าวโดยง่ายผ่านทาง ClientData's แอตทริบิวต์ dataset_computation เป็น tff.Computation ซึ่งจะนำ client_id และผลตอบแทนที่เกี่ยวข้อง tf.data.Dataset

โปรดทราบว่า preprocess ก็ทำงานกับ dataset_computation ; dataset_computation แอตทริบิวต์ของ preprocessed ClientData ประกอบด้วยท่อ preprocessing ทั้งเราเพียงแค่กำหนด:

print('dataset computation without preprocessing:')
print(client_data.dataset_computation.type_signature)
print('\n')
print('dataset computation with preprocessing:')
print(preprocessed_and_shuffled.dataset_computation.type_signature)
dataset computation without preprocessing:
(string -> <label=int32,pixels=float32[28,28]>*)


dataset computation with preprocessing:
(string -> <x=float32[?,784],y=int64[?,1]>*)

เราสามารถเรียก dataset_computation และได้รับชุดข้อมูลที่กระตือรือร้นในรันไทม์หลาม แต่อำนาจที่แท้จริงของวิธีการนี้จะใช้สิทธิเมื่อเราประกอบกับกระบวนการซ้ำหรือการคำนวณเพื่อหลีกเลี่ยงการ materializing ชุดข้อมูลเหล่านี้ในรันไทม์กระตือรือร้นระดับโลกที่ทุกคนอีก ฉิบหายมีฟังก์ชั่นผู้ช่วย tff.simulation.compose_dataset_computation_with_iterative_process ซึ่งสามารถใช้ในการทำตรงนี้

trainer_accepting_ids = tff.simulation.compose_dataset_computation_with_iterative_process(
    preprocessed_and_shuffled.dataset_computation, trainer)

ทั้งสองนี้ tff.templates.IterativeProcesses และเหนือวิ่งในทางเดียวกัน; แต่อดีตยอมรับชุดข้อมูลลูกค้า preprocessed และหลังยอมรับสตริงตัวแทนรหัสลูกค้า, การจัดการโครงการก่อสร้างของทั้งชุดและ preprocessing ในร่างกายของ - ในความเป็นจริง state สามารถส่งผ่านระหว่างสอง

for _ in range(5):
  t1 = time.time()
  state, metrics = trainer_accepting_ids.next(state, selected_client_ids)
  t2 = time.time()
  print('loss {}, round time {}'.format(metrics['train']['loss'], t2 - t1))
loss 2.8417396545410156, round time 1.6707067489624023
loss 2.7670371532440186, round time 0.5207102298736572
loss 2.665048122406006, round time 0.5302855968475342
loss 2.7213189601898193, round time 0.5313887596130371
loss 2.580148935317993, round time 0.5283482074737549

ขยายสู่ลูกค้าจำนวนมาก

trainer_accepting_ids ทันทีสามารถนำมาใช้ในการรันไทม์ multimachine ฉิบหายของและหลีกเลี่ยงการรายอื่น tf.data.Datasets และการควบคุม (และดังนั้นจึง serializing พวกเขาและส่งพวกเขาออกไปคนงาน)

สิ่งนี้ช่วยเร่งความเร็วการจำลองแบบกระจาย โดยเฉพาะอย่างยิ่งกับไคลเอนต์จำนวนมาก และเปิดใช้งานการรวมระดับกลางเพื่อหลีกเลี่ยงค่าใช้จ่ายการทำให้เป็นอนุกรม/ดีซีเรียลไลซ์เซชันที่คล้ายคลึงกัน

ทางเลือกเชิงลึก: การเขียนตรรกะก่อนการประมวลผลใน TFF . ด้วยตนเอง

TFF ได้รับการออกแบบมาเพื่อการจัดองค์ประกอบตั้งแต่ต้นจนจบ ประเภทขององค์ประกอบที่เพิ่งดำเนินการโดยผู้ช่วยของ TFF นั้นอยู่ในการควบคุมของเราอย่างเต็มที่ในฐานะผู้ใช้ เราจะได้มีการคำนวณด้วยตนเองประกอบการ preprocessing เราก็กำหนดไว้กับครูฝึกของตัวเอง next มากเพียง:

selected_clients_type = tff.FederatedType(preprocessed_and_shuffled.dataset_computation.type_signature.parameter, tff.CLIENTS)

@tff.federated_computation(trainer.next.type_signature.parameter[0], selected_clients_type)
def new_next(server_state, selected_clients):
  preprocessed_data = tff.federated_map(preprocessed_and_shuffled.dataset_computation, selected_clients)
  return trainer.next(server_state, preprocessed_data)

manual_trainer_with_preprocessing = tff.templates.IterativeProcess(initialize_fn=trainer.initialize, next_fn=new_next)

อันที่จริง นี่คือสิ่งที่ผู้ช่วยที่เราใช้ทำอย่างมีประสิทธิภาพภายใต้ประทุน (รวมถึงการตรวจสอบประเภทและการจัดการที่เหมาะสม) เรายังสามารถได้แสดงตรรกะเดียวกันแตกต่างกันเล็กน้อยโดย serializing preprocess_and_shuffle เป็น tff.Computation และย่อยสลาย federated_map เข้าไปในขั้นตอนเดียวซึ่งสร้างชุดข้อมูลยกเลิก preprocessed และอื่น ๆ ซึ่งวิ่ง preprocess_and_shuffle ที่ลูกค้าแต่ละราย

เราสามารถยืนยันได้ว่าเส้นทางแบบแมนนวลมากกว่านี้ส่งผลให้มีการคำนวณด้วยลายเซ็นประเภทเดียวกับตัวช่วยของ TFF (ชื่อพารามิเตอร์โมดูลาร์):

print(trainer_accepting_ids.next.type_signature)
print(manual_trainer_with_preprocessing.next.type_signature)
(<server_state=<model=<trainable=<float32[784,10],float32[10]>,non_trainable=<>>,optimizer_state=<int64>,delta_aggregate_state=<value_sum_process=<>,weight_sum_process=<>>,model_broadcast_state=<>>@SERVER,federated_dataset={string}@CLIENTS> -> <<model=<trainable=<float32[784,10],float32[10]>,non_trainable=<>>,optimizer_state=<int64>,delta_aggregate_state=<value_sum_process=<>,weight_sum_process=<>>,model_broadcast_state=<>>@SERVER,<broadcast=<>,aggregation=<mean_value=<>,mean_weight=<>>,train=<sparse_categorical_accuracy=float32,loss=float32>,stat=<num_examples=int64>>@SERVER>)
(<server_state=<model=<trainable=<float32[784,10],float32[10]>,non_trainable=<>>,optimizer_state=<int64>,delta_aggregate_state=<value_sum_process=<>,weight_sum_process=<>>,model_broadcast_state=<>>@SERVER,selected_clients={string}@CLIENTS> -> <<model=<trainable=<float32[784,10],float32[10]>,non_trainable=<>>,optimizer_state=<int64>,delta_aggregate_state=<value_sum_process=<>,weight_sum_process=<>>,model_broadcast_state=<>>@SERVER,<broadcast=<>,aggregation=<mean_value=<>,mean_weight=<>>,train=<sparse_categorical_accuracy=float32,loss=float32>,stat=<num_examples=int64>>@SERVER>)