Triển khai Tổng hợp tùy chỉnh

Xem trên TensorFlow.org Chạy trong Google Colab Xem nguồn trên GitHub Tải xuống sổ ghi chép

Trong hướng dẫn này, chúng tôi giải thích nguyên tắc thiết kế phía sau tff.aggregators mô-đun và thực hành tốt nhất cho việc thực hiện kết hợp tùy chỉnh các giá trị từ khách hàng đến máy chủ.

Điều kiện tiên quyết. Hướng dẫn này giả định rằng bạn đã quen thuộc với các khái niệm cơ bản của Federated cốt lõi như vị trí ( tff.SERVER , tff.CLIENTS ), làm thế nào TFF đại diện cho tính toán ( tff.tf_computation , tff.federated_computation ) và chữ ký kiểu của chúng.

!pip install --quiet --upgrade tensorflow_federated_nightly
!pip install --quiet --upgrade nest_asyncio

import nest_asyncio
nest_asyncio.apply()

Tóm tắt thiết kế

Trong TFF, "tập hợp" đề cập đến sự chuyển động của một tập hợp các giá trị trên tff.CLIENTS để tạo ra một giá trị tổng hợp của cùng loại trên tff.SERVER . Có nghĩa là, mỗi giá trị khách hàng riêng lẻ không cần phải có sẵn. Ví dụ: trong học tập liên kết, các bản cập nhật mô hình máy khách được tính trung bình để có được bản cập nhật mô hình tổng hợp để áp dụng cho mô hình chung trên máy chủ.

Ngoài khai thác hoàn thành mục tiêu này như tff.federated_sum , TFF cung cấp tff.templates.AggregationProcess (một quá trình stateful ) mà chính thức hóa các loại chữ ký cho tính toán tập hợp để nó có thể khái quát các hình thức phức tạp hơn một số tiền đơn giản.

Các thành phần chính của tff.aggregators module là các nhà máy để tạo của AggregationProcess , mà được thiết kế để xây dựng các khối chung hữu ích và replacable của TFF trong hai khía cạnh:

  1. Tính toán tham số hóa. Aggregation là một khối xây dựng độc lập có thể được cắm vào module TFF khác được thiết kế để làm việc với tff.aggregators để parameterize hợp cần thiết của họ.

Thí dụ:

learning_process = tff.learning.build_federated_averaging_process(
    ...,
    model_update_aggregation_factory=tff.aggregators.MeanFactory())
  1. Thành phần tổng hợp. Một khối xây dựng tổng hợp có thể được cấu tạo với các khối xây dựng tập hợp khác để tạo ra các tập hợp tổng hợp phức tạp hơn.

Thí dụ:

secure_mean = tff.aggregators.MeanFactory(
    value_sum_factory=tff.aggregators.SecureSumFactory(...))

Phần còn lại của hướng dẫn này giải thích cách đạt được hai mục tiêu này.

Quy trình tổng hợp

Đầu tiên chúng ta tóm tắt các tff.templates.AggregationProcess , và làm theo với các mô hình nhà máy để tạo nó.

Các tff.templates.AggregationProcess là một tff.templates.MeasuredProcess với chữ ký kiểu được chỉ định cho tập hợp. Trong đó, initializenext chức năng có chữ ký loại sau đây:

  • ( -> state_type@SERVER)
  • (<state_type@SERVER, {value_type}@CLIENTS, *> -> <state_type@SERVER, value_type@SERVER, measurements_type@SERVER>)

Trạng thái (loại state_type ) phải được đặt ở máy chủ. Các next chức năng mất như là đối số đầu vào nhà nước và một giá trị được tổng hợp (loại value_type ) được đặt tại khách hàng. Các * phương tiện bắt buộc đối số đầu vào khác, ví dụ trọng lượng trong một bình quân gia quyền. Nó trả về một đối tượng trạng thái được cập nhật, giá trị tổng hợp của cùng một loại được đặt tại máy chủ và một số phép đo.

Lưu ý rằng cả hai tiểu bang được thông qua giữa hành của next chức năng, và các phép đo thông báo nhằm thông báo bất kỳ thông tin tùy thuộc vào một thực hiện cụ thể của next chức năng, có thể rỗng. Tuy nhiên, chúng phải được quy định rõ ràng cho các phần khác của TFF để có hợp đồng rõ ràng để tuân theo.

Module TFF khác, ví dụ như các bản cập nhật mô hình trong tff.learning , dự kiến sẽ sử dụng tff.templates.AggregationProcess để parameterize cách giá trị được tổng hợp. Tuy nhiên, các giá trị được tổng hợp chính xác là gì và các ký hiệu kiểu của chúng là gì, phụ thuộc vào các chi tiết khác của mô hình đang được đào tạo và thuật toán học tập được sử dụng để thực hiện điều đó.

Để thực hiện sự kết hợp độc lập với các khía cạnh khác của tính toán, chúng tôi sử dụng mô hình nhà máy - chúng tôi tạo ra thích hợp tff.templates.AggregationProcess khi chữ ký kiểu có liên quan của các đối tượng được tổng hợp có sẵn, bằng cách gọi các create phương pháp của nhà máy. Do đó, việc xử lý trực tiếp quá trình tổng hợp chỉ cần thiết cho các tác giả thư viện, những người chịu trách nhiệm về việc tạo ra này.

Nhà máy quy trình tổng hợp

Có hai lớp nhà máy cơ sở trừu tượng để tổng hợp không trọng số và có trọng số. Họ create phương pháp có loại chữ ký của giá trị sẽ được tổng hợp và trả về một tff.templates.AggregationProcess cho tập hợp các giá trị như vậy.

Quá trình tạo ra bởi tff.aggregators.UnweightedAggregationFactory nhận hai đối số đầu vào: (1) nhà nước tại máy chủ và (2) giá trị xác định loại value_type .

An thực hiện ví dụ là tff.aggregators.SumFactory .

Quá trình tạo ra bởi tff.aggregators.WeightedAggregationFactory mất ba đối số đầu vào: (1) nhà nước tại máy chủ, (2) giá trị xác định loại value_type và (3) trọng lượng của loại weight_type , theo quy định của người sử dụng của nhà máy khi gọi nó create phương pháp.

An thực hiện ví dụ là tff.aggregators.MeanFactory mà tính trung bình gia quyền.

Mô hình nhà máy là cách chúng tôi đạt được mục tiêu đầu tiên đã nêu ở trên; tập hợp đó là một khối xây dựng độc lập. Ví dụ, khi thay đổi biến mô hình nào có thể đào tạo được, một tập hợp phức tạp không nhất thiết phải thay đổi; nhà máy đại diện cho nó sẽ được gọi bằng một loại chữ ký khác nhau khi được sử dụng bởi một phương pháp như tff.learning.build_federated_averaging_process .

Sáng tác

Nhớ lại rằng một quy trình tổng hợp chung có thể đóng gói (a) một số xử lý trước các giá trị tại các máy khách, (b) sự di chuyển của các giá trị từ máy khách đến máy chủ và (c) một số quá trình hậu xử lý giá trị tổng hợp tại máy chủ. Mục tiêu thứ hai nêu trên, thành phần tập hợp, được thực hiện bên trong tff.aggregators mô-đun bằng cách cơ cấu thi hành các nhà máy tập hợp như một phần (b) có thể được giao cho một nhà máy tập hợp.

Thay vì triển khai tất cả logic cần thiết trong một lớp nhà máy duy nhất, việc triển khai theo mặc định được tập trung vào một khía cạnh duy nhất có liên quan để tổng hợp. Khi cần, mẫu này cho phép chúng tôi thay thế các khối xây dựng tại một thời điểm.

Một ví dụ là trọng tff.aggregators.MeanFactory . Việc triển khai của nó nhân các giá trị và trọng số được cung cấp tại các máy khách, sau đó tính cả các giá trị và trọng số có trọng số một cách độc lập, sau đó chia tổng các giá trị có trọng số cho tổng trọng số tại máy chủ. Thay vì thực hiện summations bằng cách trực tiếp bằng cách sử dụng tff.federated_sum điều hành, tổng được giao cho hai trường hợp của tff.aggregators.SumFactory .

Cấu trúc như vậy làm cho hai tổng mặc định có thể được thay thế bằng các nhà máy khác nhau, các nhà máy này nhận ra tổng khác nhau. Ví dụ, một tff.aggregators.SecureSumFactory , hoặc thực hiện tùy chỉnh của tff.aggregators.UnweightedAggregationFactory . Ngược lại, thời gian, tff.aggregators.MeanFactory có thể bản thân là một tập hợp bên trong của nhà máy khác như tff.aggregators.clipping_factory , nếu các giá trị sẽ được cắt bớt trước khi trung bình.

Xem trước chỉnh đề nghị quy tụ cho việc học hướng dẫn cho các mục đích receommended của cơ chế thành phần sử dụng các nhà máy hiện có trong tff.aggregators module.

Các phương pháp hay nhất theo ví dụ

Chúng tôi sẽ minh họa cho tff.aggregators khái niệm một cách chi tiết bằng cách thực hiện một nhiệm vụ ví dụ đơn giản, và làm cho nó dần dần phổ biến hơn. Một cách khác để tìm hiểu là xem xét tình hình thực hiện của các nhà máy hiện có.

import collections
import tensorflow as tf
import tensorflow_federated as tff

Thay vì tổng hợp value , nhiệm vụ ví dụ là tổng value * 2.0 và sau đó chia cho tổng bằng 2.0 . Kết quả tổng hợp là như vậy, về mặt toán học tương đương với trực tiếp tổng hợp các value , và có thể được coi là bao gồm ba phần: (1) mở rộng quy mô tại khách hàng (2) tổng hợp trên khách hàng (3) unscaling tại máy chủ.

Sau khi thiết kế giải thích ở trên, logic sẽ được thực hiện như một lớp con của tff.aggregators.UnweightedAggregationFactory , mà tạo ra thích hợp tff.templates.AggregationProcess khi đưa ra một value_type để tổng hợp:

Thực hiện tối thiểu

Đối với nhiệm vụ ví dụ, các tính toán cần thiết luôn giống nhau, vì vậy không cần trạng thái sử dụng. Do đó trống rỗng, và nó được thể hiện dưới dạng tff.federated_value((), tff.SERVER) . Điều tương tự đối với các phép đo, bây giờ.

Do đó, việc thực hiện nhiệm vụ tối thiểu như sau:

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value((), tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      scaled_value = tff.federated_map(
          tff.tf_computation(lambda x: x * 2.0), value)
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(
          tff.tf_computation(lambda x: x / 2.0), summed_value)
      measurements = tff.federated_value((), tff.SERVER)
      return tff.templates.MeasuredProcessOutput(
          state=state, result=unscaled_value, measurements=measurements)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

Mọi thứ có hoạt động như mong đợi hay không có thể được xác minh bằng mã sau:

client_data = (1.0, 2.0, 5.0)
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print(f'Aggregation result: {output.result}  (expected 8.0)')
Type signatures of the created aggregation process:

  - initialize: ( -> <>@SERVER)
  - next: (<state=<>@SERVER,value={float32}@CLIENTS> -> <state=<>@SERVER,result=float32@SERVER,measurements=<>@SERVER>)

Aggregation result: 8.0  (expected 8.0)

Tính xác thực và các phép đo

Tính trạng thái được sử dụng rộng rãi trong TFF để biểu diễn các phép tính dự kiến ​​sẽ được thực hiện lặp đi lặp lại và thay đổi theo mỗi lần lặp. Ví dụ, trạng thái của một phép tính học tập chứa các trọng số của mô hình đang được học.

Để minh họa cách sử dụng trạng thái trong tính toán tổng hợp, chúng tôi sửa đổi tác vụ ví dụ. Thay vì nhân value bằng 2.0 , chúng ta nhân nó bằng chỉ số lặp - số lần tập hợp đã được thực hiện.

Để làm như vậy, chúng ta cần một cách để theo dõi chỉ số lặp, chỉ số này đạt được thông qua khái niệm trạng thái. Trong initialize_fn , thay vì tạo ra một trạng thái trống rỗng, chúng ta khởi tạo trạng thái để có một số không vô hướng. Sau đó, nhà nước có thể được sử dụng trong next_fn trong ba bước: (1) tăng bằng 1.0 , (2) sử dụng để nhân value , và (3) trở lại như trạng thái được cập nhật mới.

Một khi điều này được thực hiện, bạn có thể lưu ý: Nhưng chính xác mã tương tự như trên có thể được sử dụng để xác minh tất cả các công trình như mong đợi. Làm thế nào để tôi biết một cái gì đó đã thực sự thay đổi?

Câu hỏi hay! Đây là nơi mà khái niệm về các phép đo trở nên hữu ích. Nói chung, đo lường có thể báo cáo bất kỳ giá trị liên quan đến một thực đơn của next chức năng, có thể được sử dụng để theo dõi. Trong trường hợp này, nó có thể là summed_value từ ví dụ trước. Đó là, giá trị trước bước "hủy quy mô", giá trị này sẽ phụ thuộc vào chỉ số lặp. Một lần nữa, điều này không nhất thiết hữu ích trong thực tế, nhưng minh họa cơ chế liên quan.

Do đó, câu trả lời rõ ràng cho nhiệm vụ trông như sau:

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value(0.0, tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      new_state = tff.federated_map(
          tff.tf_computation(lambda x: x + 1.0), state)
      state_at_clients = tff.federated_broadcast(new_state)
      scaled_value = tff.federated_map(
          tff.tf_computation(lambda x, y: x * y), (value, state_at_clients))
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(
          tff.tf_computation(lambda x, y: x / y), (summed_value, new_state))
      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=summed_value)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

Lưu ý rằng các state mà đi vào next_fn như là đầu vào được đặt ở máy chủ. Để sử dụng nó ở khách hàng, đầu tiên nó cần phải được truyền đạt, mà là đạt được bằng cách sử dụng tff.federated_broadcast điều hành.

Để xác minh tất cả các công trình như mong đợi, bây giờ chúng ta có thể nhìn vào các báo cáo measurements , mà nên là khác nhau với mỗi vòng thi, ngay cả khi chạy với cùng client_data .

client_data = (1.0, 2.0, 5.0)
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}   (expected 8.0 * 1)')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}  (expected 8.0 * 2)')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #3')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}  (expected 8.0 * 3)')
Type signatures of the created aggregation process:

  - initialize: ( -> float32@SERVER)
  - next: (<state=float32@SERVER,value={float32}@CLIENTS> -> <state=float32@SERVER,result=float32@SERVER,measurements=float32@SERVER>)

| Round #1
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 8.0   (expected 8.0 * 1)

| Round #2
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 16.0  (expected 8.0 * 2)

| Round #3
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 24.0  (expected 8.0 * 3)

Các loại có cấu trúc

Các trọng số mô hình của một mô hình được đào tạo trong phương pháp học liên hợp thường được biểu diễn dưới dạng một tập hợp các tensor, chứ không phải là một tensor đơn lẻ. Trong TFF, điều này được thể hiện dưới dạng tff.StructType và nhà máy tập hợp thường hữu ích cần để có thể chấp nhận các loại cấu trúc.

Tuy nhiên, trong các ví dụ trên, chúng tôi chỉ làm việc với một tff.TensorType đối tượng. Nếu chúng ta cố gắng sử dụng các nhà máy trước để tạo ra quá trình tổng hợp với một tff.StructType([(tf.float32, (2,)), (tf.float32, (3,))]) , chúng tôi nhận được một lỗi lạ vì TensorFlow sẽ cố gắng để nhân một tf.Tensor và một list .

Vấn đề là thay vì nhân cấu trúc của tensors bởi một hằng số, chúng ta cần phải nhân mỗi tensor trong cấu trúc bởi một hằng số. Giải pháp thông thường cho vấn đề này là sử dụng tf.nest bên trong mô-đun của tạo tff.tf_computation s.

Các phiên bản của trước ExampleTaskFactory tương thích với các loại cấu trúc như vậy, trông như sau:

@tff.tf_computation()
def scale(value, factor):
  return tf.nest.map_structure(lambda x: x * factor, value)

@tff.tf_computation()
def unscale(value, factor):
  return tf.nest.map_structure(lambda x: x / factor, value)

@tff.tf_computation()
def add_one(value):
  return value + 1.0

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value(0.0, tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      new_state = tff.federated_map(add_one, state)
      state_at_clients = tff.federated_broadcast(new_state)
      scaled_value = tff.federated_map(scale, (value, state_at_clients))
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(unscale, (summed_value, new_state))
      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=summed_value)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

Ví dụ này nêu bật một mẫu có thể hữu ích để làm theo khi cấu trúc mã TFF. Khi không phải đối phó với các hoạt động rất đơn giản, mã trở nên rõ ràng hơn khi tff.tf_computation s mà sẽ được sử dụng như các khối xây dựng bên trong một tff.federated_computation được tạo ra ở một nơi riêng biệt. Bên trong tff.federated_computation , những khối xây dựng chỉ được kết nối bằng cách sử dụng các nhà khai thác nội tại.

Để xác minh nó hoạt động như mong đợi:

client_data = [[[1.0, 2.0], [3.0, 4.0, 5.0]],
               [[1.0, 1.0], [3.0, 0.0, -5.0]]]
factory = ExampleTaskFactory()
aggregation_process = factory.create(
    tff.to_type([(tf.float32, (2,)), (tf.float32, (3,))]))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print(f'Aggregation result: [{output.result[0]}, {output.result[1]}]\n'
      f'          Expected: [[2. 3.], [6. 4. 0.]]')
Type signatures of the created aggregation process:

  - initialize: ( -> float32@SERVER)
  - next: (<state=float32@SERVER,value={<float32[2],float32[3]>}@CLIENTS> -> <state=float32@SERVER,result=<float32[2],float32[3]>@SERVER,measurements=<float32[2],float32[3]>@SERVER>)

Aggregation result: [[2. 3.], [6. 4. 0.]]
          Expected: [[2. 3.], [6. 4. 0.]]

Tổng hợp bên trong

Bước cuối cùng là tùy chọn cho phép ủy quyền tập hợp thực tế cho các nhà máy khác, để cho phép dễ dàng thành phần các kỹ thuật tổng hợp khác nhau.

Điều này đạt được bằng cách tạo ra một tùy chọn inner_factory tranh cãi trong các nhà xây dựng của chúng tôi ExampleTaskFactory . Nếu không quy định, tff.aggregators.SumFactory được sử dụng, trong đó áp dụng các tff.federated_sum điều hành sử dụng trực tiếp trong phần trước.

Khi create được gọi là, đầu tiên chúng ta có thể gọi create của inner_factory để tạo ra quá trình tổng hợp bên trong với cùng value_type .

Tình trạng của quá trình của chúng tôi được trả về bởi initialize_fn là một thành phần của hai phần: nhà nước tạo ra bởi quá trình "này", và tình trạng của quá trình bên trong vừa tạo.

Việc thực hiện next_fn khác ở chỗ các tập hợp thực tế được giao cho next chức năng của quá trình bên trong, và về cách các sản phẩm cuối cùng được sáng tác. Nhà nước một lần nữa bao gồm "này" và nhà nước "bên trong", và các phép đo được cấu tạo một cách tương tự như một OrderedDict .

Sau đây là một triển khai của mô hình như vậy.

@tff.tf_computation()
def scale(value, factor):
  return tf.nest.map_structure(lambda x: x * factor, value)

@tff.tf_computation()
def unscale(value, factor):
  return tf.nest.map_structure(lambda x: x / factor, value)

@tff.tf_computation()
def add_one(value):
  return value + 1.0

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def __init__(self, inner_factory=None):
    if inner_factory is None:
      inner_factory = tff.aggregators.SumFactory()
    self._inner_factory = inner_factory

  def create(self, value_type):
    inner_process = self._inner_factory.create(value_type)

    @tff.federated_computation()
    def initialize_fn():
      my_state = tff.federated_value(0.0, tff.SERVER)
      inner_state = inner_process.initialize()
      return tff.federated_zip((my_state, inner_state))

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.type_at_clients(value_type))
    def next_fn(state, value):
      my_state, inner_state = state
      my_new_state = tff.federated_map(add_one, my_state)
      my_state_at_clients = tff.federated_broadcast(my_new_state)
      scaled_value = tff.federated_map(scale, (value, my_state_at_clients))

      # Delegation to an inner factory, returning values placed at SERVER.
      inner_output = inner_process.next(inner_state, scaled_value)

      unscaled_value = tff.federated_map(unscale, (inner_output.result, my_new_state))

      new_state = tff.federated_zip((my_new_state, inner_output.state))
      measurements = tff.federated_zip(
          collections.OrderedDict(
              scaled_value=inner_output.result,
              example_task=inner_output.measurements))

      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=measurements)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

Khi phân cấp cho inner_process.next chức năng, cơ cấu lợi nhuận, chúng tôi nhận được là một tff.templates.MeasuredProcessOutput , với cùng ba lĩnh vực - state , resultmeasurements . Khi tạo cấu trúc lại toàn bộ quá trình tổng hợp bao gồm, các statemeasurements ruộng nên thường sáng tác và trở về với nhau. Ngược lại, result tương ứng với lĩnh vực với giá trị được tổng hợp và thay vào đó "chảy qua" sự kết hợp sáng tác.

Các state đối tượng nên được xem như một chi tiết thi hành nhà máy, và do đó các thành phần có thể là của bất kỳ cấu trúc. Tuy nhiên, measurements tương ứng với giá trị được báo cáo cho người dùng tại một số điểm. Vì vậy, chúng tôi khuyên bạn nên sử dụng OrderedDict , với sáng tác đặt tên như vậy mà nó sẽ được rõ ràng nơi trong một thành phần nào một báo cáo số liệu đến từ đâu.

Cũng lưu ý việc sử dụng các tff.federated_zip điều hành. Các state đối tượng contolled bởi quá trình tạo nên một tff.FederatedType . Nếu chúng ta thay vì trở lại (this_state, inner_state) trong initialize_fn , chữ ký kiểu trả về của nó sẽ là một tff.StructType chứa 2-tuple của tff.FederatedType s. Việc sử dụng tff.federated_zip "thang máy" các tff.FederatedType đến cấp cao nhất. Đây là tương tự được sử dụng trong next_fn khi chuẩn bị trạng thái và các phép đo để được trả lại.

Cuối cùng, chúng ta có thể thấy cách điều này có thể được sử dụng với tập hợp bên trong mặc định:

client_data = (1.0, 2.0, 5.0)
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(tf.float32))
state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 8.0
| measurements['example_task']: ()

| Round #2
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 16.0
| measurements['example_task']: ()

... và với một tập hợp bên trong khác. Ví dụ, một ExampleTaskFactory :

client_data = (1.0, 2.0, 5.0)
# Note the inner delegation can be to any UnweightedAggregaionFactory.
# In this case, each factory creates process that multiplies by the iteration
# index (1, 2, 3, ...), thus their combination multiplies by (1, 4, 9, ...).
factory = ExampleTaskFactory(ExampleTaskFactory())
aggregation_process = factory.create(tff.TensorType(tf.float32))
state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 8.0
| measurements['example_task']: OrderedDict([('scaled_value', 8.0), ('example_task', ())])

| Round #2
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 16.0
| measurements['example_task']: OrderedDict([('scaled_value', 32.0), ('example_task', ())])

Tóm lược

Trong hướng dẫn này, chúng tôi đã giải thích các phương pháp hay nhất để làm theo để tạo một khối xây dựng tổng hợp có mục đích chung, được biểu diễn dưới dạng một nhà máy tổng hợp. Tính tổng quát thể hiện qua mục đích thiết kế theo hai cách:

  1. Tính toán tham số hóa. Aggregation là một khối xây dựng độc lập có thể được cắm vào module TFF khác được thiết kế để làm việc với tff.aggregators để parameterize hợp cần thiết, chẳng hạn như tff.learning.build_federated_averaging_process .
  2. Thành phần tổng hợp. Một khối xây dựng tổng hợp có thể được cấu tạo với các khối xây dựng tập hợp khác để tạo ra các tập hợp tổng hợp phức tạp hơn.