Zapisz datę! Google I / O powraca w dniach 18-20 maja Zarejestruj się teraz
Ta strona została przetłumaczona przez Cloud Translation API.
Switch to English

Praca z ClientData tff.

Zobacz na TensorFlow.org Uruchom w Google Colab Wyświetl źródło w serwisie GitHub Pobierz notatnik

Pojęcie zbioru danych z kluczem klientów (np. Użytkowników) jest kluczowe dla obliczeń federacyjnych, zgodnie z modelem w TFF. TFF zapewnia interfejs tff.simulation.datasets.ClientData do abstrahowania od tej koncepcji, a tff.simulation.datasets.ClientData danych, które hostuje TFF ( stackoverflow , shakespeare , emnist , cifar100 i gldv2 ), wszystkie implementują ten interfejs.

Jeśli pracujesz nad uczeniem federacyjnym z własnym ClientData danych, TFF zdecydowanie zachęca do zaimplementowania interfejsu ClientData lub użycia jednej z funkcji pomocniczych TFF do generowania ClientData które reprezentują twoje dane na dysku, np. tff.simulation.datasets.ClientData.from_clients_and_fn .

Ponieważ większość kompleksowych przykładów TFF zaczyna się od obiektów ClientData , zaimplementowanie interfejsu ClientData z niestandardowym ClientData danych ułatwi rozpisanie istniejącego kodu napisanego za pomocą TFF. Ponadto tf.data.Datasets które ClientData można iterować bezpośrednio, aby uzyskać struktury tablic numpy , więc obiekty ClientData mogą być używane z dowolną strukturą ML opartą na Pythonie przed przejściem do TFF.

Istnieje kilka wzorców, dzięki którym możesz ułatwić sobie życie, jeśli zamierzasz skalować symulacje do wielu maszyn lub je wdrażać. Poniżej omówimy kilka sposobów wykorzystania ClientData i TFF, aby nasza iteracja na małą skalę - od eksperymentów na dużą skalę - do wdrożenia produkcyjnego była tak płynna, jak to tylko możliwe.

Którego wzorca należy użyć, aby przekazać ClientData do TFF?

Omówimy szczegółowo dwa zastosowania ClientData TFF; jeśli pasujesz do jednej z dwóch poniższych kategorii, zdecydowanie wolisz jedną od drugiej. Jeśli nie, możesz potrzebować bardziej szczegółowego zrozumienia zalet i wad każdego z nich, aby dokonać bardziej szczegółowego wyboru.

  • Chcę jak najszybciej przeprowadzić iterację na komputerze lokalnym; Nie muszę mieć możliwości łatwego korzystania z rozproszonego środowiska uruchomieniowego TFF.

    • Chcesz przekazać tf.data.Datasets bezpośrednio do TFF.
    • Pozwala to na bezwzględne programowanie z obiektamitf.data.Dataset i dowolne ich przetwarzanie.
    • Zapewnia większą elastyczność niż opcja poniżej; wypychanie logiki do klientów wymaga, aby ta logika była możliwa do serializacji.
  • Chcę uruchomić obliczenia federacyjne w zdalnym środowisku wykonawczym TFF lub planuję to zrobić wkrótce.

    • W tym przypadku chcesz zmapować budowę i wstępne przetwarzanie zbioru danych do klientów.
    • Powoduje to przekazanie po prostu listy client_ids bezpośrednio do obliczeń federacyjnych.
    • Przekazywanie konstrukcji zestawu danych i wstępnego przetwarzania do klientów pozwala uniknąć wąskich gardeł w serializacji i znacznie zwiększa wydajność w przypadku setek do tysięcy klientów.

Skonfiguruj środowisko open source

Importuj pakiety

INFO:tensorflow:Enabling eager execution
INFO:tensorflow:Enabling v2 tensorshape
INFO:tensorflow:Enabling resource variables
INFO:tensorflow:Enabling tensor equality
INFO:tensorflow:Enabling control flow v2
/tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow_addons/utils/ensure_tf_install.py:43: UserWarning: You are currently using a nightly version of TensorFlow (2.5.0-dev20210326). 
TensorFlow Addons offers no support for the nightly versions of TensorFlow. Some things might work, some other might not. 
If you encounter a bug, do not file an issue on GitHub.
  UserWarning,

Manipulowanie obiektem ClientData

Zacznijmy od załadowania i zbadania danych ClientData EMNIST ClientData :

client_data, _ = tff.simulation.datasets.emnist.load_data()
Downloading emnist_all.sqlite.lzma: 100%|██████████| 170507172/170507172 [00:21<00:00, 8090587.95it/s]

Sprawdzanie pierwszego zbioru danych może nam powiedzieć, jakiego typu przykłady znajdują się w ClientData .

first_client_id = client_data.client_ids[0]
first_client_dataset = client_data.create_tf_dataset_for_client(
    first_client_id)
print(first_client_dataset.element_spec)
# This information is also available as a `ClientData` property:
assert client_data.element_type_structure == first_client_dataset.element_spec
OrderedDict([('label', TensorSpec(shape=(), dtype=tf.int32, name=None)), ('pixels', TensorSpec(shape=(28, 28), dtype=tf.float32, name=None))])

Zwróć uwagę, że zbiór danych dostarcza obiektów collections.OrderedDict które mają pixels i klucze label , gdzie piksele to tensor o kształcie [28, 28] . Załóżmy, że chcemy spłaszczyć nasze dane wejściowe do kształtu [784] . Jednym z możliwych sposobów byłoby zastosowanie funkcji przetwarzania wstępnego do naszego obiektu ClientData .

def preprocess_dataset(dataset):
  """Create batches of 5 examples, and limit to 3 batches."""

  def map_fn(input):
    return collections.OrderedDict(
        x=tf.reshape(input['pixels'], shape=(-1, 784)),
        y=tf.cast(tf.reshape(input['label'], shape=(-1, 1)), tf.int64),
    )

  return dataset.batch(5).map(
      map_fn, num_parallel_calls=tf.data.experimental.AUTOTUNE).take(5)


preprocessed_client_data = client_data.preprocess(preprocess_dataset)

# Notice that we have both reshaped and renamed the elements of the ordered dict.
first_client_dataset = preprocessed_client_data.create_tf_dataset_for_client(
    first_client_id)
print(first_client_dataset.element_spec)
OrderedDict([('x', TensorSpec(shape=(None, 784), dtype=tf.float32, name=None)), ('y', TensorSpec(shape=(None, 1), dtype=tf.int64, name=None))])

Oprócz tego możemy chcieć wykonać bardziej złożone (i prawdopodobnie stanowe) przetwarzanie wstępne, na przykład tasowanie.

def preprocess_and_shuffle(dataset):
  """Applies `preprocess_dataset` above and shuffles the result."""
  preprocessed = preprocess_dataset(dataset)
  return preprocessed.shuffle(buffer_size=5)

preprocessed_and_shuffled = client_data.preprocess(preprocess_and_shuffle)

# The type signature will remain the same, but the batches will be shuffled.
first_client_dataset = preprocessed_and_shuffled.create_tf_dataset_for_client(
    first_client_id)
print(first_client_dataset.element_spec)
OrderedDict([('x', TensorSpec(shape=(None, 784), dtype=tf.float32, name=None)), ('y', TensorSpec(shape=(None, 1), dtype=tf.int64, name=None))])

tff.Computation z tff.Computation

Teraz, gdy możemy wykonać podstawowe ClientData obiektach ClientData , jesteśmy gotowi do przesłania danych do tff.Computation . Definiujemy tff.templates.IterativeProcess który implementuje federacyjne uśrednianie i badamy różne metody przekazywania danych.

def model_fn():
  model = tf.keras.models.Sequential([
      tf.keras.layers.Input(shape=(784,)),
      tf.keras.layers.Dense(10, kernel_initializer='zeros'),
  ])
  return tff.learning.from_keras_model(
      model,
      # Note: input spec is the _batched_ shape, and includes the 
      # label tensor which will be passed to the loss function. This model is
      # therefore configured to accept data _after_ it has been preprocessed.
      input_spec=collections.OrderedDict(
          x=tf.TensorSpec(shape=[None, 784], dtype=tf.float32),
          y=tf.TensorSpec(shape=[None, 1], dtype=tf.int64)),
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])

trainer = tff.learning.build_federated_averaging_process(
    model_fn,
    client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.01))
WARNING:tensorflow:Please add `keras.layers.InputLayer` instead of `keras.Input` to Sequential model. `keras.Input` is intended to be used by Functional model.
WARNING:tensorflow:Please add `keras.layers.InputLayer` instead of `keras.Input` to Sequential model. `keras.Input` is intended to be used by Functional model.
WARNING:tensorflow:Please add `keras.layers.InputLayer` instead of `keras.Input` to Sequential model. `keras.Input` is intended to be used by Functional model.
WARNING:tensorflow:Please add `keras.layers.InputLayer` instead of `keras.Input` to Sequential model. `keras.Input` is intended to be used by Functional model.
WARNING:tensorflow:Please add `keras.layers.InputLayer` instead of `keras.Input` to Sequential model. `keras.Input` is intended to be used by Functional model.
WARNING:tensorflow:Please add `keras.layers.InputLayer` instead of `keras.Input` to Sequential model. `keras.Input` is intended to be used by Functional model.
WARNING:tensorflow:Please add `keras.layers.InputLayer` instead of `keras.Input` to Sequential model. `keras.Input` is intended to be used by Functional model.
WARNING:tensorflow:Please add `keras.layers.InputLayer` instead of `keras.Input` to Sequential model. `keras.Input` is intended to be used by Functional model.
WARNING:tensorflow:Please add `keras.layers.InputLayer` instead of `keras.Input` to Sequential model. `keras.Input` is intended to be used by Functional model.
WARNING:tensorflow:Please add `keras.layers.InputLayer` instead of `keras.Input` to Sequential model. `keras.Input` is intended to be used by Functional model.

Zanim zaczniemy pracę z tym IterativeProcess , jeden komentarz dotyczący semantyki ClientData jest w porządku. Obiekt ClientData reprezentuje całą populację dostępną do szkolenia federacyjnego, które na ogół nie jest dostępne dla środowiska wykonawczego produkcyjnego systemu FL i jest specyficzne dla symulacji. ClientData rzeczywiście daje użytkownikowi możliwość całkowitego ominięcia przetwarzania federacyjnego i po prostu trenowania modelu po stronie serwera, jak zwykle, za pośrednictwem ClientData.create_tf_dataset_from_all_clients .

Środowisko symulacyjne TFF daje badaczowi pełną kontrolę nad zewnętrzną pętlą. W szczególności oznacza to, że kwestie dotyczące dostępności klienta, rezygnacji z klienta itp. Muszą zostać uwzględnione przez użytkownika lub skrypt sterownika Python. Można na przykład modelować porzucanie klienta, dostosowując rozkład próbkowania według ClientData's client_ids danych client_ids tak, aby użytkownicy posiadający więcej danych (i odpowiednio dłużej działające obliczenia lokalne) byli wybierani z mniejszym prawdopodobieństwem.

Jednak w prawdziwym systemie federacyjnym klienci nie mogą być wybierani jawnie przez trenera modeli; wybór klientów jest delegowany do systemu wykonującego obliczenia stowarzyszone.

Przekazywanie tf.data.Datasets bezpośrednio do TFF

Jedną z opcji, które mamy do połączenia między ClientData i IterativeProcess jest konstruowanie tf.data.Datasets w Pythonie i przekazywanie tych zestawów danych do TFF.

Zwróć uwagę, że jeśli korzystamy z naszych wstępnie przetworzonych ClientData przez nas zestawy danych są odpowiedniego typu, jakiego oczekuje nasz model zdefiniowany powyżej.

selected_client_ids = preprocessed_and_shuffled.client_ids[:10]

preprocessed_data_for_clients = [
    preprocessed_and_shuffled.create_tf_dataset_for_client(
        selected_client_ids[i]) for i in range(10)
]

state = trainer.initialize()
for _ in range(5):
  t1 = time.time()
  state, metrics = trainer.next(state, preprocessed_data_for_clients)
  t2 = time.time()
  print('loss {}, round time {}'.format(metrics['train']['loss'], t2 - t1))
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow_federated/python/core/impl/compiler/tensorflow_computation_transformations.py:59: extract_sub_graph (from tensorflow.python.framework.graph_util_impl) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.compat.v1.graph_util.extract_sub_graph`
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow_federated/python/core/impl/compiler/tensorflow_computation_transformations.py:59: extract_sub_graph (from tensorflow.python.framework.graph_util_impl) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.compat.v1.graph_util.extract_sub_graph`
loss 3.2094147205352783, round time 4.7707977294921875
loss 2.8317034244537354, round time 0.5864472389221191
loss 2.7795557975769043, round time 0.6025238037109375
loss 2.821730613708496, round time 0.5769219398498535
loss 2.8752756118774414, round time 0.5830614566802979

Jeśli jednak pójdziemy tą drogą, nie będziemy mogli w trywialny sposób przejść do symulacji wielomaszynowej . Zestawy danych, które konstruujemy w lokalnym środowisku wykonawczym TensorFlow, mogą przechwytywać stan z otaczającego środowiska języka Python i kończyć się niepowodzeniem w serializacji lub deserializacji, gdy próbują odwołać się do stanu, który nie jest już dla nich dostępny. Może się to objawiać na przykład w nieodgadnionym błędzie z pliku tensor_util.cc :

Check failed: DT_VARIANT == input.dtype() (21 vs. 20)

Mapowanie budowy i wstępnego przetwarzania na klientach

Aby uniknąć tego problemu, TFF zaleca swoim użytkownikom, aby traktowali tworzenie instancji zestawu danych i przetwarzanie wstępne jako coś, co dzieje się lokalnie na każdym kliencie , oraz używanie pomocników TFF lub federated_map do jawnego uruchamiania kodu przetwarzania wstępnego na każdym kliencie.

Koncepcyjnie powód preferowania tego jest jasny: w lokalnym środowisku wykonawczym TFF klienci „przypadkowo” mają dostęp do globalnego środowiska Python, ponieważ cała federacyjna orkiestracja odbywa się na pojedynczej maszynie. W tym miejscu warto zauważyć, że podobne myślenie daje początek wieloplatformowej, zawsze możliwej do serializacji filozofii funkcjonalnej TFF.

TFF sprawia, że zmiana taka prosta poprzez ClientData's atrybut dataset_computation , a tff.Computation który odbywa się client_id i zwraca powiązany tf.data.Dataset .

Zauważ, że preprocess po prostu działa z dataset_computation ; dataset_computation atrybutem obróbką ClientData zawiera całą rurociągu przebiegu wyprzedzającego po prostu zdefiniowane:

print('dataset computation without preprocessing:')
print(client_data.dataset_computation.type_signature)
print('\n')
print('dataset computation with preprocessing:')
print(preprocessed_and_shuffled.dataset_computation.type_signature)
dataset computation without preprocessing:
(string -> <label=int32,pixels=float32[28,28]>*)


dataset computation with preprocessing:
(string -> <x=float32[?,784],y=int64[?,1]>*)

Moglibyśmy wywołać dataset_computation i otrzymać gotowy zestaw danych w środowisku wykonawczym Pythona, ale prawdziwą moc tego podejścia wykorzystuje się, gdy komponujemy za pomocą procesu iteracyjnego lub innego obliczenia, aby w ogóle uniknąć materializacji tych zestawów danych w globalnym, niecierpliwym środowisku wykonawczym. TFF zapewnia funkcję pomocniczą tff.simulation.compose_dataset_computation_with_iterative_process która może być użyta do tego właśnie.

trainer_accepting_ids = tff.simulation.compose_dataset_computation_with_iterative_process(
    preprocessed_and_shuffled.dataset_computation, trainer)

Zarówno ten tff.templates.IterativeProcesses i powyższy, działają w ten sam sposób; ale pierwszy akceptuje wstępnie przetworzone zestawy danych klienta, a drugi akceptuje ciągi znaków reprezentujących identyfikatory klientów, obsługując zarówno konstrukcję zestawu danych, jak i przetwarzanie wstępne w jego treści - w rzeczywistości state może być przekazywany między nimi.

for _ in range(5):
  t1 = time.time()
  state, metrics = trainer_accepting_ids.next(state, selected_client_ids)
  t2 = time.time()
  print('loss {}, round time {}'.format(metrics['train']['loss'], t2 - t1))
loss 2.9357950687408447, round time 3.450307846069336
loss 3.060990333557129, round time 0.5913786888122559
loss 2.682624340057373, round time 0.59104323387146
loss 2.5586647987365723, round time 0.7655746936798096
loss 2.663414716720581, round time 0.5824642181396484

Skalowanie do dużej liczby klientów

trainer_accepting_ids może być natychmiast użyty w środowisku tf.data.Datasets TFF multimachine i pozwala uniknąć materializacji tf.data.Datasets i kontrolera (a tym samym serializacji ich i wysyłania do pracowników).

To znacznie przyspiesza symulacje rozproszone, szczególnie w przypadku dużej liczby klientów, i umożliwia pośrednią agregację, aby uniknąć podobnych obciążeń związanych z serializacją / deserializacją.

Opcjonalne szczegółowe omówienie: ręczne tworzenie logiki przetwarzania wstępnego w TFF

TFF jest zaprojektowany od podstaw z myślą o kompozycjach; rodzaj kompozycji, którą właśnie wykonał pomocnik TFF, jest w pełni pod naszą kontrolą jako użytkownikami. Moglibyśmy ręcznie skomponować obliczenie przetwarzania wstępnego, które właśnie zdefiniowaliśmy, z next po prostu własnym trenerem:

selected_clients_type = tff.FederatedType(preprocessed_and_shuffled.dataset_computation.type_signature.parameter, tff.CLIENTS)

@tff.federated_computation(trainer.next.type_signature.parameter[0], selected_clients_type)
def new_next(server_state, selected_clients):
  preprocessed_data = tff.federated_map(preprocessed_and_shuffled.dataset_computation, selected_clients)
  return trainer.next(server_state, preprocessed_data)

manual_trainer_with_preprocessing = tff.templates.IterativeProcess(initialize_fn=trainer.initialize, next_fn=new_next)

W rzeczywistości to właśnie robi pomocnik, którego użyliśmy, pod maską (plus przeprowadzanie odpowiedniego sprawdzania typu i manipulacji). Mogliśmy nawet wyraziły taką samą logikę nieco inaczej, przez szeregowania preprocess_and_shuffle w tff.Computation i rozkładając federated_map w jeden krok, który konstruuje un-preprocesowany zestawów danych i innej, która biegnie preprocess_and_shuffle na każdego klienta.

Możemy sprawdzić, czy ta bardziej ręczna ścieżka skutkuje obliczeniami z tym samym podpisem typu, co pomocnik TFF (nazwy parametrów modulo):

print(trainer_accepting_ids.next.type_signature)
print(manual_trainer_with_preprocessing.next.type_signature)
(<server_state=<model=<trainable=<float32[784,10],float32[10]>,non_trainable=<>>,optimizer_state=<int64>,delta_aggregate_state=<value_sum_process=<>,weight_sum_process=<>>,model_broadcast_state=<>>@SERVER,federated_dataset={string}@CLIENTS> -> <<model=<trainable=<float32[784,10],float32[10]>,non_trainable=<>>,optimizer_state=<int64>,delta_aggregate_state=<value_sum_process=<>,weight_sum_process=<>>,model_broadcast_state=<>>@SERVER,<broadcast=<>,aggregation=<mean_value=<>,mean_weight=<>>,train=<sparse_categorical_accuracy=float32,loss=float32>,stat=<num_examples=int64>>@SERVER>)
(<server_state=<model=<trainable=<float32[784,10],float32[10]>,non_trainable=<>>,optimizer_state=<int64>,delta_aggregate_state=<value_sum_process=<>,weight_sum_process=<>>,model_broadcast_state=<>>@SERVER,selected_clients={string}@CLIENTS> -> <<model=<trainable=<float32[784,10],float32[10]>,non_trainable=<>>,optimizer_state=<int64>,delta_aggregate_state=<value_sum_process=<>,weight_sum_process=<>>,model_broadcast_state=<>>@SERVER,<broadcast=<>,aggregation=<mean_value=<>,mean_weight=<>>,train=<sparse_categorical_accuracy=float32,loss=float32>,stat=<num_examples=int64>>@SERVER>)