ML Topluluk Günü 9 Kasım! TensorFlow, JAX güncellemeler için bize katılın ve daha fazla bilgi edinin

tff'nin ClientData ile çalışmak.

TensorFlow.org'da görüntüleyin Google Colab'da çalıştırın Kaynağı GitHub'da görüntüleyin Not defterini indir

İstemciler (örneğin kullanıcılar) tarafından anahtarlanan bir veri kümesi kavramı, TFF'de modellendiği gibi birleşik hesaplama için esastır. TFF arayüzü sağlayan tff.simulation.datasets.ClientData bu kavramın üzerinde soyut etmek ve hangi TFF barındıran (veri kümeleri stackoverflow , shakespeare , emnist , cifar100 ve gldv2 ) Tüm bu arabirimini uygular.

Kendi veri kümesi ile federe öğrenmeye çalışıyorsanız, TFF kuvvetle ya uygulamak için teşvik ClientData bir oluşturmak için TFF'nin yardımcı fonksiyonlarının arayüzü veya kullanım birini ClientData örn diskte veri temsil tff.simulation.datasets.ClientData.from_clients_and_fn .

TFF'nin uçtan uca örneklerin çoğunda ile başlarken ClientData uygulanması, nesneleri ClientData TFF ile yazılan mevcut kodu ile mağaraları araştırmak için daha kolay hale getirecek özel veri kümesi ile arabirim. Bundan başka, tf.data.Datasets ClientData yapılar yapılarını elde etmek için, doğrudan üzerinde yinelenebilen numpy böylece dizileri, ClientData nesneleri TFF geçmeden önce bir Python tabanlı ML çerçeve ile kullanılabilir.

Simülasyonlarınızı birçok makineye ölçeklendirmeyi veya bunları dağıtmayı düşünüyorsanız, hayatınızı kolaylaştırabileceğiniz birkaç model vardır. Biz kullanabilirsiniz yollardan birkaç adım adım inceleyeceğiz Aşağıda ClientData bizim küçük ölçekli büyük ölçekli iterasyon-üretim deneme-to dağıtım deneyimini hale getirmek ve TFF mümkün olduğunca pürüzsüz.

ClientData'yı TFF'ye geçirmek için hangi kalıbı kullanmalıyım?

Biz TFF'nin iki kullanımlarını tartışacağız ClientData derinlemesine; Aşağıdaki iki kategoriden birine uyuyorsanız, açıkça birini diğerine tercih edeceksiniz. Değilse, daha nüanslı bir seçim yapmak için her birinin artılarını ve eksilerini daha ayrıntılı bir şekilde anlamanız gerekebilir.

  • Yerel bir makinede olabildiğince çabuk yinelemek istiyorum; TFF'nin dağıtılmış çalışma zamanından kolayca faydalanabilmem gerekmiyor.

    • Geçmek istediğiniz tf.data.Datasets doğrudan TFF'ye içinde.
    • Bu birlikte imperatively programlamak için izin verir tf.data.Dataset nesneleri ve keyfi olarak bunları işlemek.
    • Aşağıdaki seçeneğe göre daha fazla esneklik sağlar; mantığı istemcilere zorlamak, bu mantığın serileştirilebilir olmasını gerektirir.
  • Birleşik hesaplamamı TFF'nin uzak çalışma zamanında çalıştırmak istiyorum veya yakında yapmayı planlıyorum.

    • Bu durumda, veri kümesi oluşturma ve ön işlemeyi istemcilerle eşleştirmek istersiniz.
    • Sende bu sonuçlar sadece bir listesini geçirerek client_ids doğrudan federe hesaplama için.
    • Veri kümesi oluşturma ve ön işlemeyi istemcilere zorlamak, serileştirmedeki darboğazları önler ve yüzlerce ila binlerce istemcinin performansını önemli ölçüde artırır.

Açık kaynak ortamını kurun

Paketleri içe aktar

ClientData nesnesini manipüle etme

En yükleme ve TFF'nin EMNIST keşfederek başlayalım ClientData :

client_data, _ = tff.simulation.datasets.emnist.load_data()
Downloading emnist_all.sqlite.lzma: 100%|██████████| 170507172/170507172 [00:19<00:00, 8831921.67it/s]
2021-10-01 11:17:58.718735: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected

İlk veri kümesini incelenmesi bulunan örneklerin ne tür bize söyleyebilir ClientData .

first_client_id = client_data.client_ids[0]
first_client_dataset = client_data.create_tf_dataset_for_client(
    first_client_id)
print(first_client_dataset.element_spec)
# This information is also available as a `ClientData` property:
assert client_data.element_type_structure == first_client_dataset.element_spec
OrderedDict([('label', TensorSpec(shape=(), dtype=tf.int32, name=None)), ('pixels', TensorSpec(shape=(28, 28), dtype=tf.float32, name=None))])

Veri kümesi verimleri bu Not collections.OrderedDict olan nesneleri pixels ve label piksel şekli olan bir tensör olan anahtarlar, [28, 28] . Biz şekline dışarı bizim girdileri düzleştirmek isteyen varsayalım [784] . Bunu yapabileceğimiz bir yolu bizim için bir ön işleme fonksiyonunu uygulamak olacaktır ClientData nesne.

def preprocess_dataset(dataset):
  """Create batches of 5 examples, and limit to 3 batches."""

  def map_fn(input):
    return collections.OrderedDict(
        x=tf.reshape(input['pixels'], shape=(-1, 784)),
        y=tf.cast(tf.reshape(input['label'], shape=(-1, 1)), tf.int64),
    )

  return dataset.batch(5).map(
      map_fn, num_parallel_calls=tf.data.experimental.AUTOTUNE).take(5)


preprocessed_client_data = client_data.preprocess(preprocess_dataset)

# Notice that we have both reshaped and renamed the elements of the ordered dict.
first_client_dataset = preprocessed_client_data.create_tf_dataset_for_client(
    first_client_id)
print(first_client_dataset.element_spec)
OrderedDict([('x', TensorSpec(shape=(None, 784), dtype=tf.float32, name=None)), ('y', TensorSpec(shape=(None, 1), dtype=tf.int64, name=None))])

Ek olarak, örneğin karıştırma gibi daha karmaşık (ve muhtemelen durum bilgisi olan) bir ön işleme gerçekleştirmek isteyebiliriz.

def preprocess_and_shuffle(dataset):
  """Applies `preprocess_dataset` above and shuffles the result."""
  preprocessed = preprocess_dataset(dataset)
  return preprocessed.shuffle(buffer_size=5)

preprocessed_and_shuffled = client_data.preprocess(preprocess_and_shuffle)

# The type signature will remain the same, but the batches will be shuffled.
first_client_dataset = preprocessed_and_shuffled.create_tf_dataset_for_client(
    first_client_id)
print(first_client_dataset.element_spec)
OrderedDict([('x', TensorSpec(shape=(None, 784), dtype=tf.float32, name=None)), ('y', TensorSpec(shape=(None, 1), dtype=tf.int64, name=None))])

Bir ile Etkileşim tff.Computation

Şimdi bazı temel manipülasyonlar gerçekleştirebileceği ClientData nesneler, bir besleme verilerine hazır tff.Computation . Bir tanımlayan tff.templates.IterativeProcess uygulayan Federal ortalama alma ve bunu veri geçen farklı yöntemler araştırmak.

def model_fn():
  model = tf.keras.models.Sequential([
      tf.keras.layers.InputLayer(input_shape=(784,)),
      tf.keras.layers.Dense(10, kernel_initializer='zeros'),
  ])
  return tff.learning.from_keras_model(
      model,
      # Note: input spec is the _batched_ shape, and includes the 
      # label tensor which will be passed to the loss function. This model is
      # therefore configured to accept data _after_ it has been preprocessed.
      input_spec=collections.OrderedDict(
          x=tf.TensorSpec(shape=[None, 784], dtype=tf.float32),
          y=tf.TensorSpec(shape=[None, 1], dtype=tf.int64)),
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])

trainer = tff.learning.build_federated_averaging_process(
    model_fn,
    client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.01))

Bu çalışmaya başlamadan önce IterativeProcess , anlamlarını da bir açıklama ClientData sırada bulunuyor. Bir ClientData amacı genel olarak birleşik eğitim için nüfusun tamamını temsil üretim FL sisteminin yürütme ortamında mevcut değildir ve simülasyon özgüdür. ClientData gerçekten tamamen kullanıcı baypas birleşik işlem kapasitesi sayesinde ve basit yoluyla, her zamanki gibi bir sunucu tarafı modeli tren ClientData.create_tf_dataset_from_all_clients .

TFF'nin simülasyon ortamı, araştırmacıya dış döngünün tam kontrolünü verir. Özellikle bu, istemci kullanılabilirliği, istemciden ayrılma vb. hususların kullanıcı veya Python sürücü komut dosyası tarafından ele alınması gerektiği anlamına gelir. Bir senin üzerinde örnekleme dağılımını ayarlayarak örnek modeli istemci yarıda bırakılması için olabilir ClientData's client_ids fazla veriyle kullanıcılar (buna ve yerel hesaplamaları uzun süren) böyle düşük olasılık dikkate olacaktır.

Ancak gerçek bir birleşik sistemde, istemciler model eğiticisi tarafından açıkça seçilemez; istemcilerin seçimi, birleşik hesaplamayı yürüten sisteme devredilir.

Geçme tf.data.Datasets TFF doğrudan

Biz arasındaki arayüz için sahip Seçeneklerden biri ClientData ve IterativeProcess o inşa taşımaktadır tf.data.Datasets Python ve TFF bu veri setlerini geçen.

Bizim ön işlenmiş kullanırsanız Bildirimi o ClientData biz verim veri kümeleri yukarıda tanımlanan bizim model tarafından beklenen uygun tiptedir.

selected_client_ids = preprocessed_and_shuffled.client_ids[:10]

preprocessed_data_for_clients = [
    preprocessed_and_shuffled.create_tf_dataset_for_client(
        selected_client_ids[i]) for i in range(10)
]

state = trainer.initialize()
for _ in range(5):
  t1 = time.time()
  state, metrics = trainer.next(state, preprocessed_data_for_clients)
  t2 = time.time()
  print('loss {}, round time {}'.format(metrics['train']['loss'], t2 - t1))
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_federated/python/core/impl/compiler/tensorflow_computation_transformations.py:62: extract_sub_graph (from tensorflow.python.framework.graph_util_impl) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.compat.v1.graph_util.extract_sub_graph`
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_federated/python/core/impl/compiler/tensorflow_computation_transformations.py:62: extract_sub_graph (from tensorflow.python.framework.graph_util_impl) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.compat.v1.graph_util.extract_sub_graph`
loss 2.9005744457244873, round time 4.576513767242432
loss 3.113278388977051, round time 0.49641919136047363
loss 2.7581865787506104, round time 0.4904160499572754
loss 2.87259578704834, round time 0.48976993560791016
loss 3.1202380657196045, round time 0.6724586486816406

Biz bu yolu Ancak, biz trivially Multimachine simülasyon geçmek mümkün olmayacaktır. Yerel TensorFlow yürütmesinde inşa veri kümeleri çevreleyen piton ortamdan durumunu yakalamak ve bunlar artık mevcut onlara olan referans durumuna çalıştığınızda serileştirme veya seri kaldırma başarısız olabilir. Bu TensorFlow en dan gizemli hata örneğin gösterebilir tensor_util.cc :

Check failed: DT_VARIANT == input.dtype() (21 vs. 20)

Müşteriler üzerinde harita oluşturma ve ön işleme

Bu sorunu önlemek için, TFF kullanıcılarının veri kümesi örnekleme ve her bir istemci yerel olur bir şey olarak ön işleme dikkate almak ve TFF'nin yardımcıları kullanabilir veya önerir federated_map açıkça her müşteri bu ön işleme kod çalıştırmasına.

Kavramsal olarak, bunu tercih etmenin nedeni açıktır: TFF'nin yerel çalışma zamanında, tüm birleşik düzenlemenin tek bir makinede gerçekleşmesi nedeniyle, istemciler yalnızca "yanlışlıkla" global Python ortamına erişebilir. Bu noktada benzer düşüncenin TFF'nin platformlar arası, her zaman seri hale getirilebilir, işlevsel felsefesini doğurduğunu belirtmekte fayda var.

TFF aracılığıyla böyle bir değişiklik BASİTLEŞTİRİYOR ClientData's nitelik dataset_computation , bir tff.Computation bir sürer client_id ve ilgili döner tf.data.Dataset .

Not preprocess basitçe çalışır dataset_computation ; dataset_computation ön işlenen bir nitelik ClientData biz sadece tanımlanmış tüm ön işleme boru hattı içermektedir:

print('dataset computation without preprocessing:')
print(client_data.dataset_computation.type_signature)
print('\n')
print('dataset computation with preprocessing:')
print(preprocessed_and_shuffled.dataset_computation.type_signature)
dataset computation without preprocessing:
(string -> <label=int32,pixels=float32[28,28]>*)


dataset computation with preprocessing:
(string -> <x=float32[?,784],y=int64[?,1]>*)

Biz uygulayarak herhangi dataset_computation ve Python Çalışma zamanındaki bir istekli veri kümesini alıyoruz, ancak iteratif süreç ya hiç küresel istekli çalışma zamanında bu veri setlerini hayata önlemek için başka hesaplama ile oluşturduğunuzda bu yaklaşımın gerçek gücü uygulanır. TFF bir yardımcı işlevi sağlayan tff.simulation.compose_dataset_computation_with_iterative_process tam olarak bunu yapmak için de kullanılabilir.

trainer_accepting_ids = tff.simulation.compose_dataset_computation_with_iterative_process(
    preprocessed_and_shuffled.dataset_computation, trainer)

Hem bu tff.templates.IterativeProcesses ve işletilen aynı şekilde yukarıda biri; ancak önceki, önceden işlenmiş bir istemci veri kümeleri kabul eder ve ikinci müşteri kimlikleri temsil iplikler, her iki veri kümesi yapı taşıma ve vücutta ön işleme kabul - aslında state ikisi arasında geçirilebilir.

for _ in range(5):
  t1 = time.time()
  state, metrics = trainer_accepting_ids.next(state, selected_client_ids)
  t2 = time.time()
  print('loss {}, round time {}'.format(metrics['train']['loss'], t2 - t1))
loss 2.8417396545410156, round time 1.6707067489624023
loss 2.7670371532440186, round time 0.5207102298736572
loss 2.665048122406006, round time 0.5302855968475342
loss 2.7213189601898193, round time 0.5313887596130371
loss 2.580148935317993, round time 0.5283482074737549

Çok sayıda müşteriye ölçeklendirme

trainer_accepting_ids hemen TFF'nin Multimachine çalışma zamanında kullanılabilir ve kaçınır hayata tf.data.Datasets ve denetleyici (ve dolayısıyla bunları seri ve işçilere onları gönderme).

Bu, özellikle çok sayıda istemciyle dağıtılmış simülasyonları önemli ölçüde hızlandırır ve benzer serileştirme/seri hale getirme ek yükünü önlemek için ara toplamayı sağlar.

İsteğe bağlı derin dalış: TFF'de ön işleme mantığını manuel olarak oluşturma

TFF, baştan sona kompozisyonsallık için tasarlanmıştır; TFF'nin yardımcısının az önce yaptığı beste türü, kullanıcılar olarak tamamen bizim kontrolümüzdedir. Biz sadece tanımlanmış ön işleme hesaplama elle sahip oluşturmak olabilir eğitmen kendi next oldukça basit:

selected_clients_type = tff.FederatedType(preprocessed_and_shuffled.dataset_computation.type_signature.parameter, tff.CLIENTS)

@tff.federated_computation(trainer.next.type_signature.parameter[0], selected_clients_type)
def new_next(server_state, selected_clients):
  preprocessed_data = tff.federated_map(preprocessed_and_shuffled.dataset_computation, selected_clients)
  return trainer.next(server_state, preprocessed_data)

manual_trainer_with_preprocessing = tff.templates.IterativeProcess(initialize_fn=trainer.initialize, next_fn=new_next)

Aslında, kullandığımız yardımcının kaputun altında yaptığı etkili bir şekilde budur (artı uygun tip kontrolü ve manipülasyonu gerçekleştirir). Hatta seri ile hafifçe farklı aynı mantık ifade olabilir preprocess_and_shuffle bir içine tff.Computation ve ayrışan federated_map un ön işleme veri setleri ve çalışır olan bir inşa bir basamak halinde preprocess_and_shuffle her bir müşteride,.

Bu daha manuel yolun, TFF'nin yardımcısı (modulo parametre adları) ile aynı tip imzaya sahip hesaplamalarla sonuçlandığını doğrulayabiliriz:

print(trainer_accepting_ids.next.type_signature)
print(manual_trainer_with_preprocessing.next.type_signature)
(<server_state=<model=<trainable=<float32[784,10],float32[10]>,non_trainable=<>>,optimizer_state=<int64>,delta_aggregate_state=<value_sum_process=<>,weight_sum_process=<>>,model_broadcast_state=<>>@SERVER,federated_dataset={string}@CLIENTS> -> <<model=<trainable=<float32[784,10],float32[10]>,non_trainable=<>>,optimizer_state=<int64>,delta_aggregate_state=<value_sum_process=<>,weight_sum_process=<>>,model_broadcast_state=<>>@SERVER,<broadcast=<>,aggregation=<mean_value=<>,mean_weight=<>>,train=<sparse_categorical_accuracy=float32,loss=float32>,stat=<num_examples=int64>>@SERVER>)
(<server_state=<model=<trainable=<float32[784,10],float32[10]>,non_trainable=<>>,optimizer_state=<int64>,delta_aggregate_state=<value_sum_process=<>,weight_sum_process=<>>,model_broadcast_state=<>>@SERVER,selected_clients={string}@CLIENTS> -> <<model=<trainable=<float32[784,10],float32[10]>,non_trainable=<>>,optimizer_state=<int64>,delta_aggregate_state=<value_sum_process=<>,weight_sum_process=<>>,model_broadcast_state=<>>@SERVER,<broadcast=<>,aggregation=<mean_value=<>,mean_weight=<>>,train=<sparse_categorical_accuracy=float32,loss=float32>,stat=<num_examples=int64>>@SERVER>)