Google I/O הוא עטיפה! התעדכן בהפעלות של TensorFlow. צפה בהפעלות

למידה מאוחדת לסיווג תמונות

הצג באתר TensorFlow.org הפעל בגוגל קולאב צפה במקור ב-GitHub הורד מחברת

במדריך זה, אנו נשתמש בדוגמא הכשרת MNIST הקלסית להציג את למידת Federated (FL) API שכבת TFF, tff.learning - סט של ממשקים ברמה גבוהה שיכול לשמש כדי לבצע סוגים נפוצים של משימות למידת Federated, כגון הכשרה מאוחדת, כנגד מודלים שסופקו על ידי המשתמש המיושמים ב- TensorFlow.

מדריך זה, וה-Federated Learning API, מיועדים בעיקר למשתמשים שרוצים לחבר דגמי TensorFlow משלהם ל-TFF, ומתייחסים לאחרון בעיקר כאל קופסה שחורה. כדי לקבל הסבר מעמיק יותר של TFF וכיצד ליישם אלגוריתמים של למידה Federated משלך, לראות את הדרכות על ה- API Core FC - Custom Federated אלגוריתמים חלק 1 ו חלק 2 .

למידע נוסף על tff.learning , להמשיך עם למידת Federated עבור טקסט הדור , הדרכה אשר בנוסף מכסה דגמים חוזרים, גם מדגים טעינת מודל Keras בהמשכים מאומן מראש עבור עידון עם למידת Federated בשילוב עם הערכה באמצעות Keras.

לפני שאנחנו מתחילים

לפני שנתחיל, אנא הפעל את הפעולות הבאות כדי לוודא שהסביבה שלך מוגדרת כהלכה. אם אינך רואה ברכה, עיין התקנת המדריך לקבלת הוראות.

# tensorflow_federated_nightly also bring in tf_nightly, which
# can causes a duplicate tensorboard install, leading to errors.
!pip uninstall --yes tensorboard tb-nightly

!pip install --quiet --upgrade tensorflow-federated-nightly
!pip install --quiet --upgrade nest-asyncio
!pip install --quiet --upgrade tb-nightly  # or tensorboard, but not both

import nest_asyncio
nest_asyncio.apply()
%load_ext tensorboard
import collections

import numpy as np
import tensorflow as tf
import tensorflow_federated as tff

np.random.seed(0)

tff.federated_computation(lambda: 'Hello, World!')()
b'Hello, World!'

הכנת נתוני הקלט

נתחיל עם הנתונים. למידה מאוחדת דורשת מערך נתונים מאוחד, כלומר אוסף נתונים ממספר משתמשים. נתוני Federated הוא בדרך כלל בלתי iid , אשר מציב מערך ייחודי של אתגרים.

על מנת להקל על ניסויים, זרענו למאגר TFF עם כמה מערכי נתונים, כולל גרסה Federated של MNIST המכילה גירסה של הנתונים NIST המקורי כי כבר מחדש מעובד באמצעות ליף כך שהנתונים היא רגש על ידי הסופר המקורי של הספרות. מכיוון שלכל כותב יש סגנון ייחודי, מערך נתונים זה מציג את סוג ההתנהגות הלא-iid המצופה ממערכי נתונים מאוחדים.

הנה איך אנחנו יכולים לטעון אותו.

emnist_train, emnist_test = tff.simulation.datasets.emnist.load_data()

מערכי נתונים שמחזיר load_data() מקרים של tff.simulation.ClientData , ממשק המאפשר לך למנות את קבוצת המשתמשים, כדי לבנות tf.data.Dataset שמייצג את הנתונים של משתמש מסוים, וכדי שאילתה מבנה של אלמנטים בודדים. הנה איך אתה יכול להשתמש בממשק זה כדי לחקור את התוכן של מערך הנתונים. זכור שבעוד שממשק זה מאפשר לך לעבור על מזהי לקוחות, זוהי רק תכונה של נתוני הסימולציה. כפי שתראה בקרוב, זהויות הלקוח אינן בשימוש על ידי מסגרת הלמידה המאוחדת - מטרתן היחידה היא לאפשר לך לבחור תת-קבוצות של הנתונים עבור סימולציות.

len(emnist_train.client_ids)
3383
emnist_train.element_type_structure
OrderedDict([('label', TensorSpec(shape=(), dtype=tf.int32, name=None)), ('pixels', TensorSpec(shape=(28, 28), dtype=tf.float32, name=None))])
example_dataset = emnist_train.create_tf_dataset_for_client(
    emnist_train.client_ids[0])

example_element = next(iter(example_dataset))

example_element['label'].numpy()
1
from matplotlib import pyplot as plt
plt.imshow(example_element['pixels'].numpy(), cmap='gray', aspect='equal')
plt.grid(False)
_ = plt.show()

png

חקר הטרוגניות בנתונים מאוחדים

נתוני Federated הוא בדרך כלל בלתי iid , בדרך כלל יש משתמשי הפצות שונות של נתונים בהתאם לדפוסי שימוש. ייתכן שלחלק מהלקוחות יהיו פחות דוגמאות הדרכה במכשיר, הסובלים ממחסור בנתונים מקומית, בעוד שללקוחות מסוימים יהיו די והותר דוגמאות להדרכה. בואו נחקור את הרעיון הזה של הטרוגניות נתונים האופיינית למערכת מאוחדת עם נתוני ה-EMNIST שיש לנו. חשוב לציין שהניתוח העמוק הזה של נתוני הלקוח זמין רק לנו מכיוון שמדובר בסביבת סימולציה שבה כל הנתונים זמינים לנו באופן מקומי. בסביבה מאוחדת של ייצור אמיתית לא תוכל לבדוק נתונים של לקוח אחד.

ראשית, בואו ניקח דגימה של נתוני לקוח אחד כדי לקבל תחושה של הדוגמאות במכשיר מדומה אחד. מכיוון שמערך הנתונים בו אנו משתמשים נכתב על ידי כותב ייחודי, הנתונים של לקוח אחד מייצגים את כתב היד של אדם אחד עבור מדגם של הספרות 0 עד 9, המדמים את "דפוס השימוש" הייחודי של משתמש אחד.

## Example MNIST digits for one client
figure = plt.figure(figsize=(20, 4))
j = 0

for example in example_dataset.take(40):
  plt.subplot(4, 10, j+1)
  plt.imshow(example['pixels'].numpy(), cmap='gray', aspect='equal')
  plt.axis('off')
  j += 1

png

כעת בואו נראה את מספר הדוגמאות בכל לקוח עבור כל תווית ספרות של MNIST. בסביבה המאוחדת, מספר הדוגמאות בכל לקוח יכול להשתנות לא מעט, בהתאם להתנהגות המשתמש.

# Number of examples per layer for a sample of clients
f = plt.figure(figsize=(12, 7))
f.suptitle('Label Counts for a Sample of Clients')
for i in range(6):
  client_dataset = emnist_train.create_tf_dataset_for_client(
      emnist_train.client_ids[i])
  plot_data = collections.defaultdict(list)
  for example in client_dataset:
    # Append counts individually per label to make plots
    # more colorful instead of one color per plot.
    label = example['label'].numpy()
    plot_data[label].append(label)
  plt.subplot(2, 3, i+1)
  plt.title('Client {}'.format(i))
  for j in range(10):
    plt.hist(
        plot_data[j],
        density=False,
        bins=[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10])

png

כעת בואו נראה את התמונה הממוצעת לכל לקוח עבור כל תווית MNIST. קוד זה יפיק את הממוצע של כל ערך פיקסל עבור כל הדוגמאות של המשתמש עבור תווית אחת. אנו נראה שהתמונה הממוצעת של לקוח אחד עבור ספרה תיראה שונה מהתמונה הממוצעת של לקוח אחר עבור אותה ספרה, בשל סגנון כתב היד הייחודי של כל אדם. אנו יכולים להרהר כיצד כל סבב הדרכה מקומי ידחוף את המודל לכיוון שונה על כל לקוח, מכיוון שאנו לומדים מהנתונים הייחודיים של אותו משתמש באותו סבב מקומי. בהמשך המדריך נראה כיצד נוכל לקחת כל עדכון למודל מכל הלקוחות ולרכז אותם יחד למודל הגלובלי החדש שלנו, שלמד מכל אחד מהנתונים הייחודיים של הלקוח שלנו.

# Each client has different mean images, meaning each client will be nudging
# the model in their own directions locally.

for i in range(5):
  client_dataset = emnist_train.create_tf_dataset_for_client(
      emnist_train.client_ids[i])
  plot_data = collections.defaultdict(list)
  for example in client_dataset:
    plot_data[example['label'].numpy()].append(example['pixels'].numpy())
  f = plt.figure(i, figsize=(12, 5))
  f.suptitle("Client #{}'s Mean Image Per Label".format(i))
  for j in range(10):
    mean_img = np.mean(plot_data[j], 0)
    plt.subplot(2, 5, j+1)
    plt.imshow(mean_img.reshape((28, 28)))
    plt.axis('off')

png

png

png

png

png

נתוני משתמש יכולים להיות רועשים ולא מסומנים בצורה לא מהימנה. לדוגמה, בהסתכלות על הנתונים של לקוח מס' 2 לעיל, אנו יכולים לראות כי עבור תווית 2, ייתכן שהיו כמה דוגמאות עם תיוג שגוי היוצרות תמונה ממוצעת רועשת יותר.

עיבוד מוקדם של נתוני הקלט

מאחר והנתונים כבר tf.data.Dataset , המקדים ניתן להשיג באמצעות טרנספורמציות בסיס הנתונים. הנה, אנחנו לשטח את 28x28 תמונות לתוך 784 מערכים -element, דשדוש דוגמאות בודדות, לארגן אותם בקבוצות, וכן לשנות את תכונות מתוך pixels ו label כדי x ו- y לשימוש עם Keras. אנחנו גם לזרוק repeat על סט נתונים לרוץ תקופות אחדות.

NUM_CLIENTS = 10
NUM_EPOCHS = 5
BATCH_SIZE = 20
SHUFFLE_BUFFER = 100
PREFETCH_BUFFER = 10

def preprocess(dataset):

  def batch_format_fn(element):
    """Flatten a batch `pixels` and return the features as an `OrderedDict`."""
    return collections.OrderedDict(
        x=tf.reshape(element['pixels'], [-1, 784]),
        y=tf.reshape(element['label'], [-1, 1]))

  return dataset.repeat(NUM_EPOCHS).shuffle(SHUFFLE_BUFFER, seed=1).batch(
      BATCH_SIZE).map(batch_format_fn).prefetch(PREFETCH_BUFFER)

בואו נוודא שזה עבד.

preprocessed_example_dataset = preprocess(example_dataset)

sample_batch = tf.nest.map_structure(lambda x: x.numpy(),
                                     next(iter(preprocessed_example_dataset)))

sample_batch
OrderedDict([('x', array([[1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.],
       ...,
       [1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.]], dtype=float32)), ('y', array([[2],
       [1],
       [5],
       [7],
       [1],
       [7],
       [7],
       [1],
       [4],
       [7],
       [4],
       [2],
       [2],
       [5],
       [4],
       [1],
       [1],
       [0],
       [0],
       [9]], dtype=int32))])

יש לנו כמעט את כל אבני הבניין כדי לבנות מערכי נתונים מאוחדים.

אחת הדרכים כדי להאכיל נתונים Federated כדי TFF בסימולציה הוא פשוט כרשימה Python, עם כל רכיב של הרשימה מחזיק את הנתונים של משתמש בודד, אם כרשימה או בתור tf.data.Dataset . מכיוון שכבר יש לנו ממשק שמספק את האחרון, בואו נשתמש בו.

הנה פונקציית עוזר פשוטה שתבנה רשימה של מערכי נתונים מקבוצת המשתמשים הנתונה כקלט לסבב הדרכה או הערכה.

def make_federated_data(client_data, client_ids):
  return [
      preprocess(client_data.create_tf_dataset_for_client(x))
      for x in client_ids
  ]

עכשיו, איך אנחנו בוחרים לקוחות?

בתרחיש טיפוסי של הדרכה מאוחדת, אנו מתמודדים עם פוטנציאל לאוכלוסייה גדולה מאוד של מכשירי משתמשים, שרק חלק מהם עשוי להיות זמין להדרכה בנקודת זמן נתונה. זה המקרה, למשל, כאשר מכשירי הלקוח הם טלפונים ניידים המשתתפים באימונים רק כשהם מחוברים למקור מתח, מחוץ לרשת מדורגת, ובחוסר פעילות אחר.

כמובן, אנחנו נמצאים בסביבת סימולציה, וכל הנתונים זמינים באופן מקומי. בדרך כלל, כאשר מריצים סימולציות, היינו פשוט דוגמים תת-קבוצה אקראית של הלקוחות שיהיו מעורבים בכל סבב אימון, בדרך כלל שונה בכל סבב.

עם זאת, כפי שאתה יכול לברר באמצעות לימוד הנייר על ממוצעי Federated האלגוריתם, השיג התכנסות בתוך מערכת עם תת שנדגמו באופן אקראי של לקוחות בכול סיבוב יכול לקחת זמן, וזה יהיה זה מעשי צריך להפעיל מאה סיבובים הדרכה אינטראקטיבית זו.

מה שנעשה במקום זה הוא לדגום את קבוצת הלקוחות פעם אחת, ולהשתמש שוב באותה קבוצה על פני סבבים כדי להאיץ את ההתכנסות (התאמת יתר בכוונה לנתוני המשתמשים המעטים הללו). אנו משאירים את זה כתרגיל לקורא לשנות את המדריך הזה כדי לדמות דגימה אקראית - זה די קל לביצוע (לאחר שעושים זאת, זכור שההתכנסות של המודל עשויה להימשך זמן מה).

sample_clients = emnist_train.client_ids[0:NUM_CLIENTS]

federated_train_data = make_federated_data(emnist_train, sample_clients)

print('Number of client datasets: {l}'.format(l=len(federated_train_data)))
print('First dataset: {d}'.format(d=federated_train_data[0]))
Number of client datasets: 10
First dataset: <DatasetV1Adapter shapes: OrderedDict([(x, (None, 784)), (y, (None, 1))]), types: OrderedDict([(x, tf.float32), (y, tf.int32)])>

יצירת דגם עם קרס

אם אתה משתמש ב-Keras, סביר להניח שכבר יש לך קוד שבונה מודל של Keras. הנה דוגמה לדגם פשוט שיספיק לצרכים שלנו.

def create_keras_model():
  return tf.keras.models.Sequential([
      tf.keras.layers.InputLayer(input_shape=(784,)),
      tf.keras.layers.Dense(10, kernel_initializer='zeros'),
      tf.keras.layers.Softmax(),
  ])

על מנת להשתמש בכל דגם עם TFF, היא צריכה להיות עטופה מופע של tff.learning.Model ממשק, אשר חושף שיטות להחתים את המסירה קדימה של המודל, המאפיינים metadata, וכו ', בדומה Keras, אלא גם מציגה נוספים אלמנטים, כגון דרכים לשלוט בתהליך של מחשוב מדדים מאוחדים. בוא לא נדאג בקשר לזה לעת עתה; אם יש לך מודל Keras כמו זה רק הגדרנו לעיל, אתה יכול לקבל TFF לעטוף את זה בשבילך על ידי הפנייה tff.learning.from_keras_model , עובר למודל ו אצווה נתוני המדגם כפי טיעונים, כמוצג להלן.

def model_fn():
  # We _must_ create a new model here, and _not_ capture it from an external
  # scope. TFF will call this within different graph contexts.
  keras_model = create_keras_model()
  return tff.learning.from_keras_model(
      keras_model,
      input_spec=preprocessed_example_dataset.element_spec,
      loss=tf.keras.losses.SparseCategoricalCrossentropy(),
      metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])

הכשרת המודל על נתונים מאוחדים

עכשיו שיש לנו מודל עטוף כמו tff.learning.Model לשימוש עם TFF, נוכל לתת TFF לבנות אלגוריתם ממוצעים Federated ידי העלאת פונקציה עוזר tff.learning.build_federated_averaging_process , כדלקמן.

זכור כי הטענה צריכה להיות בנאי (כגון model_fn לעיל), לא מופע שכבר נבנו, כך בבניית המודל שלך יכול לקרות בהקשר בשליטת TFF (אם אתה סקרן לגבי הסיבות זו, אנו ממליצים לך לקרוא את המדריך מעקב על אלגוריתמים המנהג ).

הערה ביקורתית אחת על אלגוריתם ממוצעים Federated להלן, ישנם 2 אופטימיזציה: ייעול _client וכן ייעול _SERVER. ייעול _client משמש רק כדי לחשב עדכוני מודל מקומיים על כול לקוח. ייעול _SERVER חל העדכון הממוצע למודל הגלובלי על השרת. בפרט, משמעות הדבר היא שהבחירה באופטימיזציה וקצב הלמידה בשימוש עשויות להיות שונות מאלה שבהן השתמשת כדי לאמן את המודל על מערך נתונים סטנדרטי של iid. אנו ממליצים להתחיל עם SGD רגיל, אולי עם קצב למידה קטן מהרגיל. קצב הלמידה שאנו משתמשים בו לא כוונן בקפידה, אל תהסס להתנסות.

iterative_process = tff.learning.build_federated_averaging_process(
    model_fn,
    client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.02),
    server_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=1.0))

מה קרה הרגע? TFF בנתה זוג חישובים Federated וארוז אותם לתוך tff.templates.IterativeProcess שבה חישובים אלה זמינים כמו זוג המאפיינים initialize ו next .

בקיצור, חישובי Federated הן תוכניות בשפה הפנימית של TFF שיכול להביע אלגוריתמי Federated שונים (אתה יכול למצוא עוד על זה המנהג אלגוריתמי הדרכה). במקרה זה, שני החישובים שנוצרו ונארזו iterative_process ליישם ממוצעי Federated .

מטרתו של TFF היא להגדיר חישובים באופן שיוכלו להתבצע בהגדרות למידה מאוחדות אמיתיות, אך כרגע רק זמן ריצה של סימולציית ביצוע מקומי מיושם. כדי לבצע חישוב בסימולטור, אתה פשוט מפעיל אותו כמו פונקציית Python. הסביבה המתפרשת כברירת מחדל אינה מיועדת לביצועים גבוהים, אך היא תספיק עבור הדרכה זו; אנו מצפים לספק זמני ריצה של סימולציה בעלי ביצועים גבוהים יותר כדי להקל על מחקר בקנה מידה גדול יותר במהדורות עתידיות.

בואו נתחיל עם initialize החישוב. כמו במקרה של כל החישובים המאוחדים, אתה יכול לחשוב על זה כעל פונקציה. החישוב אינו לוקח ארגומנטים, ומחזיר תוצאה אחת - הייצוג של מצב תהליך הממוצע הפדרציה בשרת. אמנם אנחנו לא רוצים לצלול לפרטים של TFF, אבל זה עשוי להיות מאלף לראות איך המדינה הזו נראית. אתה יכול לדמיין את זה באופן הבא.

str(iterative_process.initialize.type_signature)
'( -> <model=<trainable=<float32[784,10],float32[10]>,non_trainable=<>>,optimizer_state=<int64>,delta_aggregate_state=<value_sum_process=<>,weight_sum_process=<>>,model_broadcast_state=<>>@SERVER)'

בעוד חתימה מסוג הנ"ל עלולה תחילה להיראות סתום קצת, אתה יכול לזהות כי מדינת שרת מורכב model (פרמטרים במודל הראשוניים MNIST שיחולקו לכול מכשירים), ו optimizer_state (מידע נוסף ומתוחזק על ידי השרת, כגון מספר הסיבובים לשימוש עבור לוחות זמנים היפרפרמטרים וכו').

בואו להפעיל את initialize חישוב לבנות למצב השרת.

state = iterative_process.initialize()

השני של זוג חישובי Federated, next , מייצגת סיבוב בודד שקלול Federated, אשר מורכב של דחיפה למצב השרת (כולל הפרמטרים במודל) ללקוחות, על-מכשיר אימון על הנתונים המקומיים שלהם, איסוף עדכוני מודל המיצוע , והפקת דגם חדש מעודכן בשרת.

מבחינה מושגית, אתה יכול לחשוב על next כבעל חתימת סוג פונקציונלית כי נראה כדלקמן.

SERVER_STATE, FEDERATED_DATA -> SERVER_STATE, TRAINING_METRICS

בפרט, אחד צריך לחשוב על next() לא כמו להיות פונקציה ריצות בשרת, אלא להיות ייצוג פונקציונלי הצהרתי של החישוב המבוזר כולו - חלק תשומות ניתנים על ידי השרת ( SERVER_STATE ), אבל כל המשתתפים המכשיר תורם מערך נתונים מקומי משלו.

בואו נרוץ סבב אימון בודד ונראה את התוצאות. אנו יכולים להשתמש בנתונים המאוחדים שכבר יצרנו למעלה עבור מדגם של משתמשים.

state, metrics = iterative_process.next(state, federated_train_data)
print('round  1, metrics={}'.format(metrics))
round  1, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.12345679), ('loss', 3.1193738)])), ('stat', OrderedDict([('num_examples', 4860)]))])

בוא נרוץ עוד כמה סיבובים. כפי שצוין קודם לכן, בדרך כלל בשלב זה תבחר תת-קבוצה של נתוני הסימולציה שלך ממדגם חדש שנבחר באקראי של משתמשים עבור כל סבב על מנת לדמות פריסה מציאותית שבה משתמשים באים והולכים ללא הרף, אך במחברת אינטראקטיבית זו, עבור לשם ההדגמה, פשוט נעשה שימוש חוזר באותם משתמשים, כך שהמערכת תתכנס במהירות.

NUM_ROUNDS = 11
for round_num in range(2, NUM_ROUNDS):
  state, metrics = iterative_process.next(state, federated_train_data)
  print('round {:2d}, metrics={}'.format(round_num, metrics))
round  2, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.13518518), ('loss', 2.9834728)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  3, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.14382716), ('loss', 2.861665)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  4, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.17407407), ('loss', 2.7957022)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  5, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.19917695), ('loss', 2.6146567)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  6, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.21975309), ('loss', 2.529761)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  7, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.2409465), ('loss', 2.4053504)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  8, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.2611111), ('loss', 2.315389)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  9, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.30823046), ('loss', 2.1240263)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round 10, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('sparse_categorical_accuracy', 0.33312756), ('loss', 2.1164262)])), ('stat', OrderedDict([('num_examples', 4860)]))])

אובדן האימונים הולך ופוחת לאחר כל סבב של אימונים מאוחדים, מה שמצביע על כך שהמודל מתכנס. ישנן כמה נקודות חשובות עם מדדים אימונים אלה, אולם, עיין בסעיף על הערכת מאוחר יותר במדריך זה.

הצגת מדדי מודל ב-TensorBoard

לאחר מכן, בואו נדמיין את המדדים מהחישובים המאוחדים הללו באמצעות Tensorboard.

נתחיל ביצירת המדריך וכותב הסיכום המתאים לכתוב את המדדים.

logdir = "/tmp/logs/scalars/training/"
summary_writer = tf.summary.create_file_writer(logdir)
state = iterative_process.initialize()

תכנן את המדדים הסקלרים הרלוונטיים עם אותו כותב סיכום.

with summary_writer.as_default():
  for round_num in range(1, NUM_ROUNDS):
    state, metrics = iterative_process.next(state, federated_train_data)
    for name, value in metrics['train'].items():
      tf.summary.scalar(name, value, step=round_num)

הפעל את TensorBoard עם ספריית יומן השורש שצוינה למעלה. ייתכן שיחלפו מספר שניות עד שהנתונים ייטענו.

!ls {logdir}
%tensorboard --logdir {logdir} --port=0
events.out.tfevents.1629557449.ebe6e776479e64ea-4903924a278.borgtask.google.com.458912.1.v2
Launching TensorBoard...
Reusing TensorBoard on port 50681 (pid 292785), started 0:30:30 ago. (Use '!kill 292785' to kill it.)
<IPython.core.display.Javascript at 0x7fd6617e02d0>
# Uncomment and run this this cell to clean your directory of old output for
# future graphs from this directory. We don't run it by default so that if 
# you do a "Runtime > Run all" you don't lose your results.

# !rm -R /tmp/logs/scalars/*

על מנת להציג מדדי הערכה באותו אופן, אתה יכול ליצור תיקיית eval נפרדת, כמו "לוגים/סקלרים/השוואה", כדי לכתוב ל-TensorBoard.

התאמה אישית של יישום המודל

Keras הוא API מודל ברמה גבוהה המומלצת עבור TensorFlow , ואנו מעודדים באמצעות מודלים Keras (דרך tff.learning.from_keras_model ) ב TFF בכל הזדמנות אפשרית.

עם זאת, tff.learning מספק ממשק מודל ברמה נמוכה יותר, tff.learning.Model , החושף את פונקציונליות המינימאלית הכרחית באמצעות מודל ללמידת Federated. ישירות ליישום ממשק זה (ואולי עדיין באמצעות אבני בניין כמו tf.keras.layers ) מאפשר התאמה אישית מקסימלית ללא שינוי על הרכיבים הפנימיים של אלגוריתמים של למידה Federated.

אז בואו נעשה הכל שוב מאפס.

הגדרת משתני מודל, מעבר קדימה ומדדים

הצעד הראשון הוא לזהות את משתני TensorFlow איתם אנחנו הולכים לעבוד. על מנת להפוך את הקוד הבא לקריאה יותר, הבה נגדיר מבנה נתונים שיייצג את כל הסט. זה יכלול משתנה כגון weights ו bias שאנחנו נכשיר, כמו גם משתנים יקיימו סטטיסטיקה ומונה המצטבר שונים נעדכן במהלך אימון, כגון loss_sum , accuracy_sum , ו num_examples .

MnistVariables = collections.namedtuple(
    'MnistVariables', 'weights bias num_examples loss_sum accuracy_sum')

הנה שיטה שיוצרת את המשתנים. למען הפשטות, אנחנו מייצגים את כל הסטטיסטיקות כמו tf.float32 , כמו זה יבטל את הצורך בהמרות סוג בשלב מאוחר יותר. גלישת initializers משתנה כפי lambdas היא דרישה שמטילה משתנים משאב .

def create_mnist_variables():
  return MnistVariables(
      weights=tf.Variable(
          lambda: tf.zeros(dtype=tf.float32, shape=(784, 10)),
          name='weights',
          trainable=True),
      bias=tf.Variable(
          lambda: tf.zeros(dtype=tf.float32, shape=(10)),
          name='bias',
          trainable=True),
      num_examples=tf.Variable(0.0, name='num_examples', trainable=False),
      loss_sum=tf.Variable(0.0, name='loss_sum', trainable=False),
      accuracy_sum=tf.Variable(0.0, name='accuracy_sum', trainable=False))

עם המשתנים של פרמטרי מודל וסטטיסטיקה מצטברת, כעת נוכל להגדיר את שיטת העברה קדימה המחשבת אובדן, פולטת תחזיות ומעדכנת את הסטטיסטיקה המצטברת עבור אצווה אחת של נתוני קלט, כדלקמן.

def predict_on_batch(variables, x):
  return tf.nn.softmax(tf.matmul(x, variables.weights) + variables.bias)

def mnist_forward_pass(variables, batch):
  y = predict_on_batch(variables, batch['x'])
  predictions = tf.cast(tf.argmax(y, 1), tf.int32)

  flat_labels = tf.reshape(batch['y'], [-1])
  loss = -tf.reduce_mean(
      tf.reduce_sum(tf.one_hot(flat_labels, 10) * tf.math.log(y), axis=[1]))
  accuracy = tf.reduce_mean(
      tf.cast(tf.equal(predictions, flat_labels), tf.float32))

  num_examples = tf.cast(tf.size(batch['y']), tf.float32)

  variables.num_examples.assign_add(num_examples)
  variables.loss_sum.assign_add(loss * num_examples)
  variables.accuracy_sum.assign_add(accuracy * num_examples)

  return loss, predictions

לאחר מכן, אנו מגדירים פונקציה שמחזירה קבוצה של מדדים מקומיים, שוב באמצעות TensorFlow. אלו הם הערכים (בנוסף לעדכוני המודל, המטופלים באופן אוטומטי) שמתאימים להצטבר לשרת בתהליך למידה או הערכה מאוחדים.

הנה, אנחנו פשוט לחזור לממוצע loss ו accuracy , כמו גם את num_examples , אשר נצטרך לשקלל את התרומות בצורה נכונה ממשתמשים שונים בעת חישוב מצרפי Federated.

def get_local_mnist_metrics(variables):
  return collections.OrderedDict(
      num_examples=variables.num_examples,
      loss=variables.loss_sum / variables.num_examples,
      accuracy=variables.accuracy_sum / variables.num_examples)

לבסוף, אנחנו צריכים לקבוע כיצד לקבץ את המדדים המקומיים הנפלט כל מכשיר באמצעות get_local_mnist_metrics . זהו החלק היחיד של קוד שאינו כתוב ב TensorFlow - זהו חישוב Federated לידי ביטוי TFF. אם אתה רוצה לחפור עמוק יותר, לרפרף על מנהג אלגוריתמי הדרכה, אבל ברוב היישומים, אתה לא באמת צריך; גרסאות של הדפוס המוצג להלן אמורות להספיק. כך זה נראה:

@tff.federated_computation
def aggregate_mnist_metrics_across_clients(metrics):
  return collections.OrderedDict(
      num_examples=tff.federated_sum(metrics.num_examples),
      loss=tff.federated_mean(metrics.loss, metrics.num_examples),
      accuracy=tff.federated_mean(metrics.accuracy, metrics.num_examples))

קלט metrics התואם הוויכוח על OrderedDict שמחזיר get_local_mnist_metrics לעיל, אך באופן ביקורתי את הערכים אינם tf.Tensors - הם "התאגרפו" כפי tff.Value ים, כדי להבהיר לך כבר לא יכול לתפעל אותם באמצעות TensorFlow, אבל רק באמצעות מפעילים Federated של TFF כמו tff.federated_mean ו tff.federated_sum . המילון המוחזר של אגרגטים גלובליים מגדיר את קבוצת המדדים שתהיה זמינה בשרת.

בניית מופע של tff.learning.Model

עם כל האמור לעיל, אנו מוכנים לבנות ייצוג מודל לשימוש עם TFF דומה לזה שנוצר עבורך כאשר אתה נותן ל-TFF להטמיע מודל של Keras.

from typing import Callable, List, OrderedDict

class MnistModel(tff.learning.Model):

  def __init__(self):
    self._variables = create_mnist_variables()

  @property
  def trainable_variables(self):
    return [self._variables.weights, self._variables.bias]

  @property
  def non_trainable_variables(self):
    return []

  @property
  def local_variables(self):
    return [
        self._variables.num_examples, self._variables.loss_sum,
        self._variables.accuracy_sum
    ]

  @property
  def input_spec(self):
    return collections.OrderedDict(
        x=tf.TensorSpec([None, 784], tf.float32),
        y=tf.TensorSpec([None, 1], tf.int32))

  @tf.function
  def predict_on_batch(self, x, training=True):
    del training
    return predict_on_batch(self._variables, x)

  @tf.function
  def forward_pass(self, batch, training=True):
    del training
    loss, predictions = mnist_forward_pass(self._variables, batch)
    num_exmaples = tf.shape(batch['x'])[0]
    return tff.learning.BatchOutput(
        loss=loss, predictions=predictions, num_examples=num_exmaples)

  @tf.function
  def report_local_outputs(self):
    return get_local_mnist_metrics(self._variables)

  @property
  def federated_output_computation(self):
    return aggregate_mnist_metrics_across_clients

  @tf.function
  def report_local_unfinalized_metrics(
      self) -> OrderedDict[str, List[tf.Tensor]]:
    """Creates an `OrderedDict` of metric names to unfinalized values."""
    return collections.OrderedDict(
        num_examples=[self._variables.num_examples],
        loss=[self._variables.loss_sum, self._variables.num_examples],
        accuracy=[self._variables.accuracy_sum, self._variables.num_examples])

  def metric_finalizers(
      self) -> OrderedDict[str, Callable[[List[tf.Tensor]], tf.Tensor]]:
    """Creates an `OrderedDict` of metric names to finalizers."""
    return collections.OrderedDict(
        num_examples=tf.function(func=lambda x: x[0]),
        loss=tf.function(func=lambda x: x[0] / x[1]),
        accuracy=tf.function(func=lambda x: x[0] / x[1]))

כפי שניתן לראות, שיטות מופשטות ומאפיינים מוגדרים על ידי tff.learning.Model תואמת את קטעי קוד בחלק הקודם כי הציג את המשתנים המוגדרים אובדן וסטטיסטיקה.

הנה כמה נקודות שכדאי להדגיש:

  • כל המדינה כי המודל שלך ישתמשו חייב להיות בשבי כמשתנים TensorFlow, כפי TFF אינו משתמש Python בזמן ריצה (זוכרים את הקוד שלך צריך להיות כתוב כך שניתן יהיה להפעיל למכשירים ניידים; לראות את המנהג אלגוריתמים הדרכה עבור מעמיק יותר פרשנות על הסיבות).
  • המודל שלך צריך לתאר מה צורה של הנתונים שהוא מקבל ( input_spec ), כמו בכלל, TFF היא סביבה-מאופיינות חזק ורוצה לקבוע חתימות הסוגים של כל הרכיבים. הצהרת הפורמט של הקלט של הדגם שלך היא חלק חיוני ממנו.
  • למרות שטכנית לא נדרש, אנו ממליצים לפפה כל היגיון TensorFlow (pass קדימה, בחישובי ערכים, וכו ') כמו tf.function ים, כמו זה עוזר להבטיח את TensorFlow ניתן בהמשכים, ומסיר את הצורך תלות שליטה מפורשת.

האמור לעיל מספיק להערכה ואלגוריתמים כמו Federated SGD. עם זאת, עבור ממוצע פדרלי, עלינו לציין כיצד המודל צריך להתאמן באופן מקומי בכל אצווה. אנו נציין אופטימיזציה מקומית בעת בניית אלגוריתם הממוצע הפדרציה.

הדמיית אימון מאוחד עם הדגם החדש

עם כל האמור לעיל, שאר התהליך נראה כמו מה שכבר ראינו - פשוט החלף את בנאי המודל בבנאי של מחלקת המודלים החדשה שלנו, והשתמש בשני החישובים המאוחדים בתהליך האיטרטיבי שיצרת כדי לעבור במחזוריות סבבי אימון.

iterative_process = tff.learning.build_federated_averaging_process(
    MnistModel,
    client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.02))
state = iterative_process.initialize()
state, metrics = iterative_process.next(state, federated_train_data)
print('round  1, metrics={}'.format(metrics))
round  1, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('num_examples', 4860.0), ('loss', 3.0708053), ('accuracy', 0.12777779)])), ('stat', OrderedDict([('num_examples', 4860)]))])
for round_num in range(2, 11):
  state, metrics = iterative_process.next(state, federated_train_data)
  print('round {:2d}, metrics={}'.format(round_num, metrics))
round  2, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('num_examples', 4860.0), ('loss', 3.011699), ('accuracy', 0.13024691)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  3, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('num_examples', 4860.0), ('loss', 2.7408307), ('accuracy', 0.15576132)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  4, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('num_examples', 4860.0), ('loss', 2.6761012), ('accuracy', 0.17921811)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  5, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('num_examples', 4860.0), ('loss', 2.675567), ('accuracy', 0.1855967)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  6, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('num_examples', 4860.0), ('loss', 2.5664043), ('accuracy', 0.20329218)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  7, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('num_examples', 4860.0), ('loss', 2.4179392), ('accuracy', 0.24382716)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  8, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('num_examples', 4860.0), ('loss', 2.3237286), ('accuracy', 0.26687244)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round  9, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('num_examples', 4860.0), ('loss', 2.1861682), ('accuracy', 0.28209877)])), ('stat', OrderedDict([('num_examples', 4860)]))])
round 10, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('train', OrderedDict([('num_examples', 4860.0), ('loss', 2.046388), ('accuracy', 0.32037038)])), ('stat', OrderedDict([('num_examples', 4860)]))])

כדי לראות מדדים אלה בתוך TensorBoard, עיין בשלבים המפורטים למעלה ב"הצגת מדדי מודל ב- TensorBoard".

הַעֲרָכָה

כל הניסויים שלנו עד כה הציגו רק מדדי אימון מאוחדים - המדדים הממוצעים על כל קבוצות הנתונים שהוכשרו על פני כל הלקוחות בסבב. זה מציג את החששות הנורמליים לגבי התאמת יתר, במיוחד מכיוון שהשתמשנו באותה קבוצה של לקוחות בכל סבב למען הפשטות, אבל יש רעיון נוסף של התאמת יתר במדדי אימון ספציפיים לאלגוריתם הממוצע הפדרציה. זה הכי קל לראות אם אנו מדמיינים שלכל לקוח יש אצווה בודדת של נתונים, ואנו מתאמנים על אצווה זו במשך איטרציות רבות (תקופות). במקרה זה, המודל המקומי יתאים במהירות בדיוק לאותה אצווה אחת, ולכן מדד הדיוק המקומי שאנו ממוצעים יתקרב ל-1.0. לפיכך, ניתן לקחת את מדדי האימון הללו כסימן לכך שהאימונים מתקדמים, אך לא הרבה יותר.

כדי לבצע הערכה על נתונים מאוחדים, אתה יכול לבנות עוד חישוב Federated נועד בדיוק למטרה זו, באמצעות tff.learning.build_federated_evaluation הפונקציה, ו חולף בנאי המודל שלך כטיעון. לידיעתך, בשונה עם ממוצעים Federated, שבו השתמשנו MnistTrainableModel , זה מספיק כדי לעבור את MnistModel . הערכה לא מבצעת ירידה בשיפוע, ואין צורך לבנות מטובי.

עבור ניסויים ומחקר, כאשר נתון מבחן מרוכזים נגישים, Federated למידה עבור טקסט הדור ממחישה עוד אפשרות הערכה: לקחת את המשקולות המאומנות מלמידת Federated, וליישם אותם למודל Keras סטנדרטי, ולאחר מכן פשוט קורא tf.keras.models.Model.evaluate() על בסיס נתון ריכוזיים.

evaluation = tff.learning.build_federated_evaluation(MnistModel)

אתה יכול לבדוק את חתימת הסוג המופשט של פונקציית ההערכה באופן הבא.

str(evaluation.type_signature)
'(<server_model_weights=<trainable=<float32[784,10],float32[10]>,non_trainable=<>>@SERVER,federated_dataset={<x=float32[?,784],y=int32[?,1]>*}@CLIENTS> -> <eval=<num_examples=float32,loss=float32,accuracy=float32>,stat=<num_examples=int64>>@SERVER)'

אין צורך להיות מודאג לגבי הפרטים בשלב זה, רק להיות מודע לכך שזה לוקח את הטופס הכללי הבא, בדומה tff.templates.IterativeProcess.next אבל עם שני הבדלים חשוב. ראשית, אנחנו לא מחזירים את מצב השרת, מכיוון שהערכה לא משנה את המודל או כל היבט אחר של המצב - אתה יכול לחשוב על זה כחסר מצב. שנית, הערכה זקוקה רק למודל, ואינה דורשת שום חלק אחר של מצב השרת שעשוי להיות קשור לאימון, כגון משתני מיטוב.

SERVER_MODEL, FEDERATED_DATA -> TRAINING_METRICS

בואו נפעיל הערכה לגבי המצב האחרון אליו הגענו במהלך האימון. על מנת לחלץ את מודל המודרך האחרון מהמדינה לשרת, אתה פשוט לגשת .model החבר, כדלקמן.

train_metrics = evaluation(state.model, federated_train_data)

הנה מה שאנחנו מקבלים. שימו לב שהמספרים נראים מעט טובים יותר ממה שדווח בסבב האימונים האחרון למעלה. לפי המוסכמה, מדדי האימון המדווחים על ידי תהליך האימון האיטרטיבי משקפים בדרך כלל את ביצועי המודל בתחילת סבב ההכשרה, ולכן מדדי ההערכה יהיו תמיד צעד אחד קדימה.

str(train_metrics)
"OrderedDict([('eval', OrderedDict([('num_examples', 4860.0), ('loss', 1.7510437), ('accuracy', 0.2788066)])), ('stat', OrderedDict([('num_examples', 4860)]))])"

כעת, בואו נרכיב דגימת בדיקה של נתונים מאוחדים ונפעיל מחדש הערכה על נתוני הבדיקה. הנתונים יבואו מאותו מדגם של משתמשים אמיתיים, אבל ממערך נתונים מוחזק מובהק.

federated_test_data = make_federated_data(emnist_test, sample_clients)

len(federated_test_data), federated_test_data[0]
(10,
 <DatasetV1Adapter shapes: OrderedDict([(x, (None, 784)), (y, (None, 1))]), types: OrderedDict([(x, tf.float32), (y, tf.int32)])>)
test_metrics = evaluation(state.model, federated_test_data)
str(test_metrics)
"OrderedDict([('eval', OrderedDict([('num_examples', 580.0), ('loss', 1.8361608), ('accuracy', 0.2413793)])), ('stat', OrderedDict([('num_examples', 580)]))])"

בכך מסתיים ההדרכה. אנו ממליצים לך לשחק עם הפרמטרים (למשל, גדלי אצווה, מספר משתמשים, תקופות, קצבי למידה וכו'), כדי לשנות את הקוד שלמעלה כדי לדמות אימון על דגימות אקראיות של משתמשים בכל סבב, ולחקור את ההדרכות האחרות. פיתחנו.