Praca z danymi klienta tff.

Zobacz na TensorFlow.org Uruchom w Google Colab Wyświetl źródło na GitHub Pobierz notatnik

Pojęcie zestawu danych z kluczami klientów (np. użytkowników) jest niezbędne do obliczeń federacyjnych, jak modelowano w TFF. TFF zapewnia interfejs tff.simulation.datasets.ClientData abstrahować nad tą koncepcją, a zbiory danych, które hosty TFF ( stackoverflow , Szekspir , emnist , cifar100 i gldv2 ) wszystko wdrożyć ten interfejs.

Jeśli pracujesz na stowarzyszonym nauki z własnego zbioru danych, TFF silnie zachęca albo wdrożyć ClientData jeden interfejs lub użyć funkcji pomocniczych TFF, aby wygenerować ClientData który reprezentuje dane na dysku, np tff.simulation.datasets.ClientData.from_clients_and_fn .

Jak większość z end-to-end przykładów TFF zaczynają z ClientData obiektów, wdrożenia ClientData interfejs z niestandardowego zestawu danych będzie łatwiej spelunk pośrednictwem istniejącego kodu napisanego z TFF. Ponadto, tf.data.Datasets które ClientData konstrukty mogą być iterowane się bezpośrednio z wytworzeniem struktury numpy tablicach, tak ClientData przedmiotów można stosować Pythona oparciu ramami ML przed przejściem do TFF.

Istnieje kilka wzorców, dzięki którym możesz ułatwić sobie życie, jeśli zamierzasz skalować swoje symulacje na wielu maszynach lub je wdrażać. Poniżej będziemy chodzić przez kilka sposobów możemy wykorzystać ClientData i TFF, aby nasz małą skalę iteracja-na dużą skalę eksperymenty, do produkcji doświadczenie wdrażania gładka jak to możliwe.

Jakiego wzorca powinienem użyć, aby przekazać ClientData do TFF?

Omówimy dwa zwyczaje TFF za ClientData głębokości; jeśli pasujesz do jednej z dwóch poniższych kategorii, wyraźnie wolisz jedną od drugiej. Jeśli nie, możesz potrzebować bardziej szczegółowego zrozumienia zalet i wad każdego z nich, aby dokonać bardziej zniuansowanego wyboru.

  • Chcę jak najszybciej wykonać iterację na komputerze lokalnym; Nie muszę być w stanie łatwo korzystać z rozproszonego środowiska wykonawczego TFF.

    • Chcesz przekazać tf.data.Datasets do TFF bezpośrednio.
    • To pozwala na zaprogramowanie koniecznie z tf.data.Dataset obiektów oraz przetwarzać je w sposób arbitralny.
    • Zapewnia większą elastyczność niż opcja poniżej; wypychanie logiki do klientów wymaga, aby ta logika była możliwa do serializacji.
  • Chcę uruchomić swoje sfederowane obliczenia w zdalnym środowisku wykonawczym TFF lub planuję to zrobić wkrótce.

    • W takim przypadku chcesz odwzorować budowę i wstępne przetwarzanie zestawu danych na klientów.
    • To skutkuje Ciebie przechodząc po prostu listę client_ids bezpośrednio do stowarzyszonego obliczeń.
    • Wypychanie konstrukcji zestawu danych i wstępnego przetwarzania do klientów pozwala uniknąć wąskich gardeł w serializacji i znacznie zwiększa wydajność w przypadku setek do tysięcy klientów.

Skonfiguruj środowisko open-source

Importuj paczki

Manipulowanie obiektem ClientData

Zacznijmy od załadunku i odkrywania TFF za EMNIST ClientData :

client_data, _ = tff.simulation.datasets.emnist.load_data()
Downloading emnist_all.sqlite.lzma: 100%|██████████| 170507172/170507172 [00:19<00:00, 8831921.67it/s]
2021-10-01 11:17:58.718735: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected

Kontrola pierwszego zestawu danych może nam powiedzieć, jakiego rodzaju przykłady są w ClientData .

first_client_id = client_data.client_ids[0]
first_client_dataset = client_data.create_tf_dataset_for_client(
    first_client_id)
print(first_client_dataset.element_spec)
# This information is also available as a `ClientData` property:
assert client_data.element_type_structure == first_client_dataset.element_spec
OrderedDict([('label', TensorSpec(shape=(), dtype=tf.int32, name=None)), ('pixels', TensorSpec(shape=(28, 28), dtype=tf.float32, name=None))])

Należy pamiętać, że wydajność zbioru danych collections.OrderedDict obiektów, które mają pixels i label klawiszy gdzie pikseli jest napinacz w kształcie [28, 28] . Załóżmy, że chcemy wyprostować nasze wejść się do kształtu [784] . Jednym z możliwych sposobów możemy to zrobić byłoby zastosować funkcję wstępnego przetwarzania do naszego ClientData obiektu.

def preprocess_dataset(dataset):
  """Create batches of 5 examples, and limit to 3 batches."""

  def map_fn(input):
    return collections.OrderedDict(
        x=tf.reshape(input['pixels'], shape=(-1, 784)),
        y=tf.cast(tf.reshape(input['label'], shape=(-1, 1)), tf.int64),
    )

  return dataset.batch(5).map(
      map_fn, num_parallel_calls=tf.data.experimental.AUTOTUNE).take(5)


preprocessed_client_data = client_data.preprocess(preprocess_dataset)

# Notice that we have both reshaped and renamed the elements of the ordered dict.
first_client_dataset = preprocessed_client_data.create_tf_dataset_for_client(
    first_client_id)
print(first_client_dataset.element_spec)
OrderedDict([('x', TensorSpec(shape=(None, 784), dtype=tf.float32, name=None)), ('y', TensorSpec(shape=(None, 1), dtype=tf.int64, name=None))])

Możemy chcieć dodatkowo wykonać bardziej złożone (i prawdopodobnie stanowe) przetwarzanie wstępne, na przykład tasowanie.

def preprocess_and_shuffle(dataset):
  """Applies `preprocess_dataset` above and shuffles the result."""
  preprocessed = preprocess_dataset(dataset)
  return preprocessed.shuffle(buffer_size=5)

preprocessed_and_shuffled = client_data.preprocess(preprocess_and_shuffle)

# The type signature will remain the same, but the batches will be shuffled.
first_client_dataset = preprocessed_and_shuffled.create_tf_dataset_for_client(
    first_client_id)
print(first_client_dataset.element_spec)
OrderedDict([('x', TensorSpec(shape=(None, 784), dtype=tf.float32, name=None)), ('y', TensorSpec(shape=(None, 1), dtype=tf.int64, name=None))])

Relacje ze tff.Computation

Teraz możemy wykonać pewne podstawowe manipulacje z ClientData obiektów, jesteśmy gotowi do danych zasilających do tff.Computation . Definiujemy tff.templates.IterativeProcess który implementuje Federalne Uśrednianie i zbadać różne sposoby przekazując mu dane.

def model_fn():
  model = tf.keras.models.Sequential([
      tf.keras.layers.InputLayer(input_shape=(784,)),
      tf.keras.layers.Dense(10, kernel_initializer='zeros'),
  ])
  return tff.learning.from_keras_model(
      model,
      # Note: input spec is the _batched_ shape, and includes the 
      # label tensor which will be passed to the loss function. This model is
      # therefore configured to accept data _after_ it has been preprocessed.
      input_spec=collections.OrderedDict(
          x=tf.TensorSpec(shape=[None, 784], dtype=tf.float32),
          y=tf.TensorSpec(shape=[None, 1], dtype=tf.int64)),
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])

trainer = tff.learning.build_federated_averaging_process(
    model_fn,
    client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.01))

Zanim rozpoczniemy pracę z tym IterativeProcess , jeden komentarz na semantyki ClientData jest w porządku. ClientData obiekt reprezentuje całość populacji dostępnej dla stowarzyszonym szkolenia, które w ogóle jest niedostępna dla środowiska wykonanie systemu produkcyjnego FL i jest specyficzny dla symulacji. ClientData rzeczywiście daje użytkownikowi zdolność do ominięcia stowarzyszonym computing całkowicie i po prostu trenować model po stronie serwera jak zwykle poprzez ClientData.create_tf_dataset_from_all_clients .

Środowisko symulacyjne TFF daje badaczowi pełną kontrolę nad zewnętrzną pętlą. W szczególności oznacza to, że względy dostępności klienta, porzucania klienta itp. muszą być uwzględnione w skrypcie użytkownika lub sterownika Pythona. Można by na przykład model klient porzucaniu przez dostosowanie rozkładu próbkowania nad ClientData's client_ids taki, że użytkownicy z większej ilości danych (i odpowiednio dłużej działa lokalnych obliczeń) zostanie wybrany z niższym prawdopodobieństwem.

W rzeczywistym systemie sfederowanym klienci nie mogą być jednak wyraźnie wybierani przez trenera modelu; wybór klientów jest delegowany do systemu, który wykonuje obliczenia sfederowane.

Przechodząc tf.data.Datasets bezpośrednio do TFF

Jedną z opcji mamy na współpracę między ClientData i IterativeProcess polega na konstruowaniu tf.data.Datasets w Pythonie i przekazywanie tych zestawów danych do TFF.

Należy zauważyć, że, gdy używamy wstępnie przygotowane ClientData zestawy danych, to jest wydajność odpowiedniego rodzaju oczekiwanego naszym modelu określono powyżej.

selected_client_ids = preprocessed_and_shuffled.client_ids[:10]

preprocessed_data_for_clients = [
    preprocessed_and_shuffled.create_tf_dataset_for_client(
        selected_client_ids[i]) for i in range(10)
]

state = trainer.initialize()
for _ in range(5):
  t1 = time.time()
  state, metrics = trainer.next(state, preprocessed_data_for_clients)
  t2 = time.time()
  print('loss {}, round time {}'.format(metrics['train']['loss'], t2 - t1))
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_federated/python/core/impl/compiler/tensorflow_computation_transformations.py:62: extract_sub_graph (from tensorflow.python.framework.graph_util_impl) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.compat.v1.graph_util.extract_sub_graph`
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_federated/python/core/impl/compiler/tensorflow_computation_transformations.py:62: extract_sub_graph (from tensorflow.python.framework.graph_util_impl) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.compat.v1.graph_util.extract_sub_graph`
loss 2.9005744457244873, round time 4.576513767242432
loss 3.113278388977051, round time 0.49641919136047363
loss 2.7581865787506104, round time 0.4904160499572754
loss 2.87259578704834, round time 0.48976993560791016
loss 3.1202380657196045, round time 0.6724586486816406

Jeśli weźmiemy tę trasę, jednak będziemy w stanie przenieść się trywialnie multimachine symulacji. Zestawy danych budujemy w lokalnym środowisku wykonawczym TensorFlow może przechwycić stan z otaczającego środowiska Pythona, a nie w serializacji lub deserializacji gdy próbują stanu odniesienia, który nie jest już dostępna na nich jest. Może się to objawiać na przykład w niezgłębionej błędu z TensorFlow za tensor_util.cc :

Check failed: DT_VARIANT == input.dtype() (21 vs. 20)

Mapowanie konstrukcji i preprocessingu nad klientami

Aby uniknąć tego problemu, TFF zaleca użytkownikom rozważyć zestawu danych instancji i przerób jako coś, co dzieje się lokalnie na każdym kliencie i użyć pomocników TFF lub federated_map jawnie uruchomić ten kod przerób na każdym kliencie.

Koncepcyjnie powód preferowania tego jest jasny: w lokalnym środowisku wykonawczym TFF klienci mają dostęp do globalnego środowiska Python tylko „przypadkowo”, ponieważ cała sfederowana orkiestracja odbywa się na jednym komputerze. Warto w tym miejscu zauważyć, że podobne myślenie daje początek wieloplatformowej, zawsze możliwej do serializacji, funkcjonalnej filozofii TFF.

TFF sprawia, że zmiana taka prosta poprzez ClientData's atrybut dataset_computation , a tff.Computation który odbywa się client_id i zwraca powiązany tf.data.Dataset .

Zauważ, że preprocess po prostu działa z dataset_computation ; dataset_computation atrybutem obróbką ClientData zawiera całą rurociągu przebiegu wyprzedzającego po prostu zdefiniowane:

print('dataset computation without preprocessing:')
print(client_data.dataset_computation.type_signature)
print('\n')
print('dataset computation with preprocessing:')
print(preprocessed_and_shuffled.dataset_computation.type_signature)
dataset computation without preprocessing:
(string -> <label=int32,pixels=float32[28,28]>*)


dataset computation with preprocessing:
(string -> <x=float32[?,784],y=int64[?,1]>*)

Możemy powołać dataset_computation i otrzymać chętny zestaw danych w czasie wykonywania Pythona, ale prawdziwa siła tego podejścia jest wykonywana gdy komponujemy z iteracyjnego procesu lub innego obliczeń uniknięcia materializacji tych zbiorów danych w globalnym chętny wykonywania w ogóle. TFF zapewnia funkcję pomocnika tff.simulation.compose_dataset_computation_with_iterative_process który może być używany do dokładnie to zrobić.

trainer_accepting_ids = tff.simulation.compose_dataset_computation_with_iterative_process(
    preprocessed_and_shuffled.dataset_computation, trainer)

Zarówno ten tff.templates.IterativeProcesses i jeden powyżej uruchomić ten sam sposób; ale były akceptuje wstępnie przygotowane zestawy danych klienta, a ten ostatni akceptuje ciągów reprezentujących identyfikatory klientów, obsługi zarówno budowę zestawu danych i przerób w jego ciele - w rzeczywistości state mogą być przekazywane między nimi.

for _ in range(5):
  t1 = time.time()
  state, metrics = trainer_accepting_ids.next(state, selected_client_ids)
  t2 = time.time()
  print('loss {}, round time {}'.format(metrics['train']['loss'], t2 - t1))
loss 2.8417396545410156, round time 1.6707067489624023
loss 2.7670371532440186, round time 0.5207102298736572
loss 2.665048122406006, round time 0.5302855968475342
loss 2.7213189601898193, round time 0.5313887596130371
loss 2.580148935317993, round time 0.5283482074737549

Skalowanie do dużej liczby klientów

trainer_accepting_ids mogą być natychmiast wykorzystane w multimachine wykonywania TFF, a unika materializacji tf.data.Datasets i kontroler (a zatem ich szeregowania i wysyłając je do robotników).

To znacznie przyspiesza symulacje rozproszone, zwłaszcza w przypadku dużej liczby klientów, i umożliwia pośrednią agregację, aby uniknąć podobnych kosztów związanych z serializacją/deserializacją.

Opcjonalne deepdive: ręczne komponowanie logiki przetwarzania wstępnego w TFF

TFF jest zaprojektowany od podstaw z myślą o kompozycyjności; rodzaj kompozycji wykonanej właśnie przez pomocnika TFF jest w pełni pod naszą kontrolą jako użytkowników. Mogliśmy ręcznie komponować obliczeń przebiegu wyprzedzającego właśnie zdefiniowana z własnego trener next całkiem prosto:

selected_clients_type = tff.FederatedType(preprocessed_and_shuffled.dataset_computation.type_signature.parameter, tff.CLIENTS)

@tff.federated_computation(trainer.next.type_signature.parameter[0], selected_clients_type)
def new_next(server_state, selected_clients):
  preprocessed_data = tff.federated_map(preprocessed_and_shuffled.dataset_computation, selected_clients)
  return trainer.next(server_state, preprocessed_data)

manual_trainer_with_preprocessing = tff.templates.IterativeProcess(initialize_fn=trainer.initialize, next_fn=new_next)

W rzeczywistości to właśnie robi pod maską pomocnik, którego użyliśmy (plus wykonanie odpowiedniego sprawdzenia typu i manipulacji). Mogliśmy nawet wyraziły taką samą logikę nieco inaczej, przez szeregowania preprocess_and_shuffle w tff.Computation i rozkładając federated_map w jeden krok, który konstruuje un-preprocesowany zestawów danych i innej, która biegnie preprocess_and_shuffle na każdego klienta.

Możemy zweryfikować, że ta bardziej ręczna ścieżka prowadzi do obliczeń z taką samą sygnaturą typu jak pomocnik TFF (nazwy parametrów modulo):

print(trainer_accepting_ids.next.type_signature)
print(manual_trainer_with_preprocessing.next.type_signature)
(<server_state=<model=<trainable=<float32[784,10],float32[10]>,non_trainable=<>>,optimizer_state=<int64>,delta_aggregate_state=<value_sum_process=<>,weight_sum_process=<>>,model_broadcast_state=<>>@SERVER,federated_dataset={string}@CLIENTS> -> <<model=<trainable=<float32[784,10],float32[10]>,non_trainable=<>>,optimizer_state=<int64>,delta_aggregate_state=<value_sum_process=<>,weight_sum_process=<>>,model_broadcast_state=<>>@SERVER,<broadcast=<>,aggregation=<mean_value=<>,mean_weight=<>>,train=<sparse_categorical_accuracy=float32,loss=float32>,stat=<num_examples=int64>>@SERVER>)
(<server_state=<model=<trainable=<float32[784,10],float32[10]>,non_trainable=<>>,optimizer_state=<int64>,delta_aggregate_state=<value_sum_process=<>,weight_sum_process=<>>,model_broadcast_state=<>>@SERVER,selected_clients={string}@CLIENTS> -> <<model=<trainable=<float32[784,10],float32[10]>,non_trainable=<>>,optimizer_state=<int64>,delta_aggregate_state=<value_sum_process=<>,weight_sum_process=<>>,model_broadcast_state=<>>@SERVER,<broadcast=<>,aggregation=<mean_value=<>,mean_weight=<>>,train=<sparse_categorical_accuracy=float32,loss=float32>,stat=<num_examples=int64>>@SERVER>)