עזרה להגן על שונית המחסום הגדולה עם TensorFlow על Kaggle הצטרפו אתגר

עבודה עם ClientData של tff.

הצג באתר TensorFlow.org הפעל בגוגל קולאב צפה במקור ב-GitHub הורד מחברת

הרעיון של מערך נתונים המבוסס על ידי לקוחות (למשל משתמשים) חיוני לחישוב מאוחד כפי שעוצב ב-TFF. TFF מספק ממשק tff.simulation.datasets.ClientData כדי מופשט מעל המושג הזה, ואת מערכי נתונים אשר המארחים TFF ( StackOverflow , שייקספיר , emnist , cifar100 , ו gldv2 ) כל ליישם ממשק זה.

אם אתה עובד על למידה Federated עם הנתונים שלכם, TFF ממליצה לך גם ליישם את ClientData אחד ממשק או שימוש בפונקציות העוזר של TFF ליצור ClientData המייצגת את הנתונים בדיסק, למשל tff.simulation.datasets.ClientData.from_clients_and_fn .

כמו רוב דוגמאות מקצה לקצה של TFF להתחיל עם ClientData אובייקטים, יישום ClientData ממשק עם הנתונים מותאמים אישית שלך תקל על spelunk באמצעות הקוד הקיים בכתב עם TFF. יתר על כן, tf.data.Datasets אשר ClientData המבנים ניתן iterated מעל ישירות להניב מבנים של numpy מערכים, כך ClientData עצמים ניתן להשתמש עם כל מסגרת פיתון מבוססי ML לפני שעבר TFF.

ישנם מספר דפוסים שבאמצעותם אתה יכול להקל על חייך אם אתה מתכוון להגדיל את הסימולציות שלך למכונות רבות או לפרוס אותן. להלן נסקור כמה מן הדרכים שבהן אנו יכולים להשתמש ClientData ו TFF לעשות בקנה מידה קטן שלנו איטרציה-אל-מידה גדול ניסויים-ייצור ניסיון פריסה חלקה ככל האפשר.

באיזה דפוס עלי להשתמש כדי להעביר ClientData לתוך TFF?

נדונו בשני שימושים של של TFF ClientData לעומק; אם אתה מתאים לאחת משתי הקטגוריות שלהלן, ברור שתעדיף אחת על השנייה. אם לא, ייתכן שתזדקק להבנה מפורטת יותר של היתרונות והחסרונות של כל אחד מהם כדי לעשות בחירה יותר ניואנסית.

  • אני רוצה לחזור כמה שיותר מהר על מכונה מקומית; אני לא צריך להיות מסוגל לנצל בקלות את זמן הריצה המבוזר של TFF.

    • אתה רוצה להעביר tf.data.Datasets כדי TFF ישירות.
    • זה מאפשר לך לתכנת במפגיע עם tf.data.Dataset אובייקטים, ולעבד אותם באופן שרירותי.
    • זה מספק יותר גמישות מהאפשרות שלהלן; דחיפה של לוגיקה ללקוחות דורשת שהלוגיקה הזו תהיה ניתנת לסידרה.
  • אני רוצה להפעיל את החישוב המאוחד שלי בזמן ריצה מרחוק של TFF, או שאני מתכנן לעשות זאת בקרוב.

    • במקרה זה ברצונך למפות בניית מערכי נתונים ועיבוד מקדים ללקוחות.
    • תוצאות זה אתה עובר פשוט רשימה של client_ids ישירות חישוב Federated שלך.
    • דחיפה של בניית מערכי נתונים ועיבוד מקדים ללקוחות מונעת צווארי בקבוק בסריאליזציה, ומגדילה משמעותית את הביצועים עם מאות עד אלפי לקוחות.

הגדר סביבת קוד פתוח

ייבוא ​​חבילות

מניפולציה של אובייקט ClientData

בואו נתחיל בכך העמסה לחקור EMNIST של TFF ClientData :

client_data, _ = tff.simulation.datasets.emnist.load_data()
Downloading emnist_all.sqlite.lzma: 100%|██████████| 170507172/170507172 [00:19<00:00, 8831921.67it/s]
2021-10-01 11:17:58.718735: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected

בדיקת הנתונים הראשונים יכולה לספר לנו איזה סוג של דוגמא נמצאות ClientData .

first_client_id = client_data.client_ids[0]
first_client_dataset = client_data.create_tf_dataset_for_client(
    first_client_id)
print(first_client_dataset.element_spec)
# This information is also available as a `ClientData` property:
assert client_data.element_type_structure == first_client_dataset.element_spec
OrderedDict([('label', TensorSpec(shape=(), dtype=tf.int32, name=None)), ('pixels', TensorSpec(shape=(28, 28), dtype=tf.float32, name=None))])

שים לב כי התשואות במערך collections.OrderedDict חפצות שיש pixels ו label מפתחות, שבו פיקסלים הוא מותח עם צורה [28, 28] . נניח שברצוננו לשטח תשומות שלנו אל הצורה [784] . דרך אחת אפשרית שאנחנו יכולים לעשות זה יהיה ליישם פונקציה עיבוד מראש כדי שלנו ClientData האובייקט.

def preprocess_dataset(dataset):
  """Create batches of 5 examples, and limit to 3 batches."""

  def map_fn(input):
    return collections.OrderedDict(
        x=tf.reshape(input['pixels'], shape=(-1, 784)),
        y=tf.cast(tf.reshape(input['label'], shape=(-1, 1)), tf.int64),
    )

  return dataset.batch(5).map(
      map_fn, num_parallel_calls=tf.data.experimental.AUTOTUNE).take(5)


preprocessed_client_data = client_data.preprocess(preprocess_dataset)

# Notice that we have both reshaped and renamed the elements of the ordered dict.
first_client_dataset = preprocessed_client_data.create_tf_dataset_for_client(
    first_client_id)
print(first_client_dataset.element_spec)
OrderedDict([('x', TensorSpec(shape=(None, 784), dtype=tf.float32, name=None)), ('y', TensorSpec(shape=(None, 1), dtype=tf.int64, name=None))])

ייתכן שנרצה בנוסף לבצע עיבוד מוקדם יותר מורכב (ואולי מצבי), למשל ערבוב.

def preprocess_and_shuffle(dataset):
  """Applies `preprocess_dataset` above and shuffles the result."""
  preprocessed = preprocess_dataset(dataset)
  return preprocessed.shuffle(buffer_size=5)

preprocessed_and_shuffled = client_data.preprocess(preprocess_and_shuffle)

# The type signature will remain the same, but the batches will be shuffled.
first_client_dataset = preprocessed_and_shuffled.create_tf_dataset_for_client(
    first_client_id)
print(first_client_dataset.element_spec)
OrderedDict([('x', TensorSpec(shape=(None, 784), dtype=tf.float32, name=None)), ('y', TensorSpec(shape=(None, 1), dtype=tf.int64, name=None))])

התממשקות עם tff.Computation

עכשיו אנחנו יכולים לבצע כמה מניפולציות בסיסיות עם ClientData חפץ, אנחנו מוכנים עדכון נתונים על tff.Computation . אנו מגדירים tff.templates.IterativeProcess אשר מיישם ממוצעי Federated , ולחקור שיטות שונות של העברתו נתונים.

def model_fn():
  model = tf.keras.models.Sequential([
      tf.keras.layers.InputLayer(input_shape=(784,)),
      tf.keras.layers.Dense(10, kernel_initializer='zeros'),
  ])
  return tff.learning.from_keras_model(
      model,
      # Note: input spec is the _batched_ shape, and includes the 
      # label tensor which will be passed to the loss function. This model is
      # therefore configured to accept data _after_ it has been preprocessed.
      input_spec=collections.OrderedDict(
          x=tf.TensorSpec(shape=[None, 784], dtype=tf.float32),
          y=tf.TensorSpec(shape=[None, 1], dtype=tf.int64)),
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])

trainer = tff.learning.build_federated_averaging_process(
    model_fn,
    client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.01))

לפני שאנו מתחילים לעבוד עם זה IterativeProcess , תגובה אחת על הסמנטיקה של ClientData היא בסדר. ClientData אובייקט מייצג את מכלול האוכלוסייה האפשרית לאימוני Federated, אשר באופן כללי הוא לא זמין לסביבת ביצוע מערכת ייצור FL הוא ספציפי סימולציה. ClientData אכן נותן למשתמש את היכולת מחשוב Federated עוקף לחלוטין ופשוט לאמן מודל בצד השרת כרגיל באמצעות ClientData.create_tf_dataset_from_all_clients .

סביבת הסימולציה של TFF מעמידה את החוקר בשליטה מלאה על הלולאה החיצונית. במיוחד זה מרמז על שיקולים של זמינות לקוח, נשירת לקוח וכו', חייבים להיות מטופלים על ידי המשתמש או סקריפט מנהל ההתקן של Python. לכן, ניתן לומר על נשירת לקוח מודל לדוגמא על ידי התאמת התפלגות הדגימה מעל שלך ClientData's client_ids שמשתמש כאלה עם נתונים יותר (ובהתאמה כבר ריצה חישובים מקומיים) ייבחר עם הסתברות נמוכה.

עם זאת, במערכת מאוחדת אמיתית, לקוחות לא יכולים להיבחר במפורש על ידי מאמן המודלים; בחירת הלקוחות מואצלת למערכת שמבצעת את החישוב המאוחד.

עובר tf.data.Datasets ישירות TFF

אפשרות אחת יש לנו להתממשקות בין ClientData וכן IterativeProcess היא בניית tf.data.Datasets בפייתון, ולהעביר מערכי נתונים אלה כדי TFF.

שימו לב שאם אנו משתמשים שעברו עיבוד מקדים שלנו ClientData בבסיסי הנתונים שאנו להניב הם מהסוג המתאים צפוי ידי המודל שלנו כמוגדר לעיל.

selected_client_ids = preprocessed_and_shuffled.client_ids[:10]

preprocessed_data_for_clients = [
    preprocessed_and_shuffled.create_tf_dataset_for_client(
        selected_client_ids[i]) for i in range(10)
]

state = trainer.initialize()
for _ in range(5):
  t1 = time.time()
  state, metrics = trainer.next(state, preprocessed_data_for_clients)
  t2 = time.time()
  print('loss {}, round time {}'.format(metrics['train']['loss'], t2 - t1))
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_federated/python/core/impl/compiler/tensorflow_computation_transformations.py:62: extract_sub_graph (from tensorflow.python.framework.graph_util_impl) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.compat.v1.graph_util.extract_sub_graph`
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_federated/python/core/impl/compiler/tensorflow_computation_transformations.py:62: extract_sub_graph (from tensorflow.python.framework.graph_util_impl) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.compat.v1.graph_util.extract_sub_graph`
loss 2.9005744457244873, round time 4.576513767242432
loss 3.113278388977051, round time 0.49641919136047363
loss 2.7581865787506104, round time 0.4904160499572754
loss 2.87259578704834, round time 0.48976993560791016
loss 3.1202380657196045, round time 0.6724586486816406

אם ניקח המסלול הזה, עם זאת, לא נוכל לעבור טריוויאלית כדי סימולציה multimachine. מערכי הנתונים אנו בונים את ריצת TensorFlow המקומית יכולים ללכוד מדינה מסביבת פיתון שמסביב, וגם להיכשל בהמשכים או deserialization כשהם מנסים מצב התייחסות אשר איננו זמין להם. זה יכול להתבטא למשל שגיאה מובנת מן של TensorFlow tensor_util.cc :

Check failed: DT_VARIANT == input.dtype() (21 vs. 20)

מיפוי בנייה ועיבוד מקדים על פני הלקוחות

כדי למנוע בעיה זו, TFF ממליצה למשתמשים שלה כדי לשקול במערך אדגום וביצוע עיבוד מקדים כמשהו שקורה באופן מקומי על כל לקוח, ולהשתמש העוזרים של TFF או federated_map לרוץ זה קוד עיבוד מקדים במפורש על כל לקוח.

מבחינה קונספטואלית, הסיבה להעדפת זה ברורה: בזמן הריצה המקומי של TFF, ללקוחות יש רק "בטעות" גישה לסביבת Python העולמית בשל העובדה שכל התזמור המאוחד מתרחש על מכונה אחת. ראוי לציין בשלב זה שחשיבה דומה מולידה את הפילוסופיה הפונקציונלית חוצת הפלטפורמות, הניתנת תמיד להמשכה, של TFF.

TFF מבצע שינוי כזה פשוט באמצעות ClientData's תכונה dataset_computation , A tff.Computation אשר לוקח client_id ומחזירה את קשורה tf.data.Dataset .

שים לב preprocess פשוט עובד עם dataset_computation ; dataset_computation התכונה של מעובדי ClientData משלבת בצנרת המקדימה כול אנחנו פשוט מוגדרות:

print('dataset computation without preprocessing:')
print(client_data.dataset_computation.type_signature)
print('\n')
print('dataset computation with preprocessing:')
print(preprocessed_and_shuffled.dataset_computation.type_signature)
dataset computation without preprocessing:
(string -> <label=int32,pixels=float32[28,28]>*)


dataset computation with preprocessing:
(string -> <x=float32[?,784],y=int64[?,1]>*)

יכולנו להפעיל dataset_computation ולקבל במערך להוט על ריצת Python, אבל הכח האמיתי של גישה זו מבוצע כאשר אנו להלחין עם תהליך שחוזר על עצמו או חישוב אחר, כדי למנוע התממשות מערכי נתונים אלה ריצה העולמי הלהוט בכלל. TFF מספק פונקציה עוזר tff.simulation.compose_dataset_computation_with_iterative_process אשר ניתן להשתמש בהם כדי לעשות בדיוק את זה.

trainer_accepting_ids = tff.simulation.compose_dataset_computation_with_iterative_process(
    preprocessed_and_shuffled.dataset_computation, trainer)

שניהם זה tff.templates.IterativeProcesses ואת אחד מעל לרוץ באותה דרך; אבל לשעבר מקבל מערכי נתוני לקוח שעברו עיבוד ראשוניים, ולאחר שהאחרון מקבל מחרוזות המייצגות מזהות לקוח, טיפול שני בנייה נתון וביצוע עיבוד מקדימים בגוף שלה - למעשה state יכולה להיות מועברת בין שתיים.

for _ in range(5):
  t1 = time.time()
  state, metrics = trainer_accepting_ids.next(state, selected_client_ids)
  t2 = time.time()
  print('loss {}, round time {}'.format(metrics['train']['loss'], t2 - t1))
loss 2.8417396545410156, round time 1.6707067489624023
loss 2.7670371532440186, round time 0.5207102298736572
loss 2.665048122406006, round time 0.5302855968475342
loss 2.7213189601898193, round time 0.5313887596130371
loss 2.580148935317993, round time 0.5283482074737549

קנה מידה למספר רב של לקוחות

trainer_accepting_ids יכול לשמש מייד ריצת multimachine של TFF, ונמנע להתממש tf.data.Datasets ואת הבקר (ולכן בהמשכים אותם שולחים אותם לפועלים).

זה מזרז באופן משמעותי סימולציות מבוזרות, במיוחד עם מספר רב של לקוחות, ומאפשר צבירה ביניים כדי למנוע תקורה של סריאליזציה/דה-סריאליזציה דומה.

צניחה עמוקה אופציונלית: חיבור ידני של לוגיקה של עיבוד מקדים ב-TFF

TFF מיועד לקומפוזיציות מהיסוד; סוג הקומפוזיציה שבוצע זה עתה על ידי העוזר של TFF נמצא בשליטה מלאה שלנו כמשתמשים. אנחנו יכולים להיות להלחין בחישוב המקדים ידני רק הגדרנו עם המאמן עצמו next בפשטות:

selected_clients_type = tff.FederatedType(preprocessed_and_shuffled.dataset_computation.type_signature.parameter, tff.CLIENTS)

@tff.federated_computation(trainer.next.type_signature.parameter[0], selected_clients_type)
def new_next(server_state, selected_clients):
  preprocessed_data = tff.federated_map(preprocessed_and_shuffled.dataset_computation, selected_clients)
  return trainer.next(server_state, preprocessed_data)

manual_trainer_with_preprocessing = tff.templates.IterativeProcess(initialize_fn=trainer.initialize, next_fn=new_next)

למעשה, זה למעשה מה שעוזר שהשתמשנו בו עושה מתחת למכסה המנוע (ובנוסף ביצוע בדיקת סוג מתאים ומניפולציה). יכולנו אפילו הביעו את אותה לוגיקה מעט שונה, על ידי בהמשכים preprocess_and_shuffle לתוך tff.Computation , וכן לפירוק federated_map לתוך צעד אחד אשר בונה מערכי נתונים-מעובדים האו"ם ועוד המפעילה preprocess_and_shuffle על כל לקוח.

אנו יכולים לוודא שהנתיב היותר ידני זה מביא לחישובים עם חתימת סוג זהה לזה של המסייע של TFF (שמות פרמטרים מודולו):

print(trainer_accepting_ids.next.type_signature)
print(manual_trainer_with_preprocessing.next.type_signature)
(<server_state=<model=<trainable=<float32[784,10],float32[10]>,non_trainable=<>>,optimizer_state=<int64>,delta_aggregate_state=<value_sum_process=<>,weight_sum_process=<>>,model_broadcast_state=<>>@SERVER,federated_dataset={string}@CLIENTS> -> <<model=<trainable=<float32[784,10],float32[10]>,non_trainable=<>>,optimizer_state=<int64>,delta_aggregate_state=<value_sum_process=<>,weight_sum_process=<>>,model_broadcast_state=<>>@SERVER,<broadcast=<>,aggregation=<mean_value=<>,mean_weight=<>>,train=<sparse_categorical_accuracy=float32,loss=float32>,stat=<num_examples=int64>>@SERVER>)
(<server_state=<model=<trainable=<float32[784,10],float32[10]>,non_trainable=<>>,optimizer_state=<int64>,delta_aggregate_state=<value_sum_process=<>,weight_sum_process=<>>,model_broadcast_state=<>>@SERVER,selected_clients={string}@CLIENTS> -> <<model=<trainable=<float32[784,10],float32[10]>,non_trainable=<>>,optimizer_state=<int64>,delta_aggregate_state=<value_sum_process=<>,weight_sum_process=<>>,model_broadcast_state=<>>@SERVER,<broadcast=<>,aggregation=<mean_value=<>,mean_weight=<>>,train=<sparse_categorical_accuracy=float32,loss=float32>,stat=<num_examples=int64>>@SERVER>)