העבר אימון מעבד/GPU מרובה עובדים

הצג באתר TensorFlow.org הפעל בגוגל קולאב צפה במקור ב-GitHub הורד מחברת

מדריך זה מדגים כיצד להעביר את זרימת העבודה המבוזרת של מספר עובדים מ-TensorFlow 1 ל-TensorFlow 2.

כדי לבצע הכשרה מרובת עובדים עם מעבדי CPU/GPU:

להכין

התחל עם כמה ייבוא ​​נחוץ ומערך נתונים פשוט למטרות הדגמה:

# The notebook uses a dataset instance for `Model.fit` with
# `ParameterServerStrategy`, which depends on symbols in TF 2.7.
# Install a utility needed for this demonstration
!pip install portpicker

import tensorflow as tf
import tensorflow.compat.v1 as tf1
features = [[1., 1.5], [2., 2.5], [3., 3.5]]
labels = [[0.3], [0.5], [0.7]]
eval_features = [[4., 4.5], [5., 5.5], [6., 6.5]]
eval_labels = [[0.8], [0.9], [1.]]

תזדקק למשתנה סביבת התצורה 'TF_CONFIG' לאימון על מספר מכונות ב-TensorFlow. השתמש 'TF_CONFIG' כדי לציין את הכתובות 'cluster' ו-'המשימה 'task' . (למידע נוסף במדריך Distributed_training ).

import json
import os

tf_config = {
    'cluster': {
        'chief': ['localhost:11111'],
        'worker': ['localhost:12345', 'localhost:23456', 'localhost:21212'],
        'ps': ['localhost:12121', 'localhost:13131'],
    },
    'task': {'type': 'chief', 'index': 0}
}

os.environ['TF_CONFIG'] = json.dumps(tf_config)

השתמש בהצהרת del כדי להסיר את המשתנה (אבל בהכשרה מרובת עובדים בעולם האמיתי ב-TensorFlow 1, לא תצטרך לעשות זאת):

del os.environ['TF_CONFIG']

TensorFlow 1: הכשרה מבוזרת מרובת עובדים עם ממשקי API של tf.estimator

קטע הקוד הבא מדגים את זרימת העבודה הקנונית של הכשרה מרובה עובדים ב-TF1: תשתמש ב- tf.estimator.Estimator , tf.estimator.TrainSpec , tf.estimator.EvalSpec ו- tf.estimator.train_and_evaluate API כדי להפיץ האימון:

def _input_fn():
  return tf1.data.Dataset.from_tensor_slices((features, labels)).batch(1)

def _eval_input_fn():
  return tf1.data.Dataset.from_tensor_slices(
      (eval_features, eval_labels)).batch(1)

def _model_fn(features, labels, mode):
  logits = tf1.layers.Dense(1)(features)
  loss = tf1.losses.mean_squared_error(labels=labels, predictions=logits)
  optimizer = tf1.train.AdagradOptimizer(0.05)
  train_op = optimizer.minimize(loss, global_step=tf1.train.get_global_step())
  return tf1.estimator.EstimatorSpec(mode, loss=loss, train_op=train_op)

estimator = tf1.estimator.Estimator(model_fn=_model_fn)
train_spec = tf1.estimator.TrainSpec(input_fn=_input_fn)
eval_spec = tf1.estimator.EvalSpec(input_fn=_eval_input_fn)
tf1.estimator.train_and_evaluate(estimator, train_spec, eval_spec)
INFO:tensorflow:Using default config.
WARNING:tensorflow:Using temporary folder as model directory: /tmp/tmpvfb91q_5
INFO:tensorflow:Using config: {'_model_dir': '/tmp/tmpvfb91q_5', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': None, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_checkpoint_save_graph_def': True, '_service': None, '_cluster_spec': ClusterSpec({}), '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1}
INFO:tensorflow:Not using Distribute Coordinator.
INFO:tensorflow:Running training and evaluation locally (non-distributed).
INFO:tensorflow:Start train and evaluate loop. The evaluate will happen after every checkpoint. Checkpoint frequency is determined based on RunConfig arguments: save_checkpoints_steps None or save_checkpoints_secs 600.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/training/training_util.py:401: Variable.initialized_value (from tensorflow.python.ops.variables) is deprecated and will be removed in a future version.
Instructions for updating:
Use Variable.read_value. Variables in 2.X are initialized automatically both in eager and graph (inside tf.defun) contexts.
INFO:tensorflow:Calling model_fn.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/training/adagrad.py:143: calling Constant.__init__ (from tensorflow.python.ops.init_ops) with dtype is deprecated and will be removed in a future version.
Instructions for updating:
Call initializer instance with the dtype argument instead of passing it to the constructor
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Create CheckpointSaverHook.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...
INFO:tensorflow:Saving checkpoints for 0 into /tmp/tmpvfb91q_5/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...
INFO:tensorflow:loss = 0.038075272, step = 0
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 3...
INFO:tensorflow:Saving checkpoints for 3 into /tmp/tmpvfb91q_5/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 3...
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Starting evaluation at 2021-11-13T02:31:06
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Restoring parameters from /tmp/tmpvfb91q_5/model.ckpt-3
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Inference Time : 0.13630s
INFO:tensorflow:Finished evaluation at 2021-11-13-02:31:06
INFO:tensorflow:Saving dict for global step 3: global_step = 3, loss = 0.005215075
INFO:tensorflow:Saving 'checkpoint_path' summary for global step 3: /tmp/tmpvfb91q_5/model.ckpt-3
INFO:tensorflow:Loss for final step: 0.061832994.
({'loss': 0.005215075, 'global_step': 3}, [])

TensorFlow 2: הכשרה מרובה עובדים עם אסטרטגיות הפצה

ב-TensorFlow 2, הכשרה מבוזרת על פני מספר עובדים עם CPUs, GPUs ו-TPUs נעשית באמצעות tf.distribute.Strategy s.

הדוגמה הבאה מדגימה כיצד להשתמש בשתי אסטרטגיות כאלה: tf.distribute.experimental.ParameterServerStrategy ו- tf.distribute.MultiWorkerMirroredStrategy , שתיהן מיועדות לאימון CPU/GPU עם מספר עובדים.

ParameterServerStrategy מעסיק רכז ( 'chief' ), מה שהופך אותו לידידותי יותר עם הסביבה במחברת Colab זו. אתה תשתמש כאן בכמה כלי עזר כדי להגדיר את האלמנטים התומכים החיוניים לחוויה ניתנת להרצה כאן: אתה תיצור אשכול בתהליך , שבו נעשה שימוש בשרשורים כדי לדמות את שרתי הפרמטרים ( 'ps' ) והעובדים ( 'worker' ) . למידע נוסף על אימון שרת פרמטרים, עיין במדריך הדרכה לשרת פרמטרים עם ParameterServerStrategy .

בדוגמה זו, תחילה הגדר את משתנה הסביבה 'TF_CONFIG' עם tf.distribute.cluster_resolver.TFConfigClusterResolver כדי לספק את מידע האשכול. אם אתה משתמש במערכת ניהול אשכולות עבור ההדרכה המבוזרת שלך, בדוק אם היא מספקת 'TF_CONFIG' עבורך, ובמקרה זה אינך צריך להגדיר במפורש משתנה סביבה זה. (למידע נוסף בקטע הגדרת משתנה הסביבה 'TF_CONFIG' במדריך ההדרכה המבוזרת עם TensorFlow ).

# Find ports that are available for the `'chief'` (the coordinator),
# `'worker'`s, and `'ps'` (parameter servers).
import portpicker

chief_port = portpicker.pick_unused_port()
worker_ports = [portpicker.pick_unused_port() for _ in range(3)]
ps_ports = [portpicker.pick_unused_port() for _ in range(2)]

# Dump the cluster information to `'TF_CONFIG'`.
tf_config = {
    'cluster': {
        'chief': ["localhost:%s" % chief_port],
        'worker': ["localhost:%s" % port for port in worker_ports],
        'ps':  ["localhost:%s" % port for port in ps_ports],
    },
    'task': {'type': 'chief', 'index': 0}
}
os.environ['TF_CONFIG'] = json.dumps(tf_config)

# Use a cluster resolver to bridge the information to the strategy created below.
cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()

לאחר מכן, צור tf.distribute.Server s עבור שרתי העובדים והפרמטרים אחד אחד:

# Workers need some inter_ops threads to work properly.
# This is only needed for this notebook to demo. Real servers
# should not need this.
worker_config = tf.compat.v1.ConfigProto()
worker_config.inter_op_parallelism_threads = 4

for i in range(3):
  tf.distribute.Server(
      cluster_resolver.cluster_spec(),
      job_name="worker",
      task_index=i,
      config=worker_config)

for i in range(2):
  tf.distribute.Server(
      cluster_resolver.cluster_spec(),
      job_name="ps",
      task_index=i)

בהכשרה מבוזרת בעולם האמיתי, במקום להפעיל את כל ה- tf.distribute.Server s על הרכז, אתה תשתמש במספר מכונות, ואלו שמיועדות כ- "worker" ו- "ps" (שרתי פרמטרים) הפעל tf.distribute.Server . עיין בסעיף אשכולות בעולם האמיתי במדריך הדרכה לשרת פרמטרים לפרטים נוספים.

כשהכל מוכן, צור את האובייקט ParameterServerStrategy :

strategy = tf.distribute.experimental.ParameterServerStrategy(cluster_resolver)
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'chief': ['localhost:16660'], 'ps': ['localhost:15313', 'localhost:20369'], 'worker': ['localhost:21380', 'localhost:18699', 'localhost:19420']})
INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'chief': ['localhost:16660'], 'ps': ['localhost:15313', 'localhost:20369'], 'worker': ['localhost:21380', 'localhost:18699', 'localhost:19420']})
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:chief/replica:0/task:0/device:GPU:0'], variable_device = '/job:chief/replica:0/task:0/device:GPU:0'
INFO:tensorflow:Number of GPUs on workers: 1

לאחר שיצרת אובייקט אסטרטגיה, הגדר את המודל, האופטימיזציה ומשתנים אחרים, וקרא ל-Keras Model.compile בתוך ה-API של Strategy.scope כדי להפיץ את ההדרכה. (עיין במסמכי ה-API של Strategy.scope למידע נוסף.)

אם אתה מעדיף להתאים אישית את האימון שלך על ידי, למשל, הגדרת מעברים קדימה ואחורה, עיין בסעיף אימון עם לולאת אימון מותאמת אישית במדריך אימון שרת פרמטרים לפרטים נוספים.

dataset = tf.data.Dataset.from_tensor_slices(
      (features, labels)).shuffle(10).repeat().batch(64)

eval_dataset = tf.data.Dataset.from_tensor_slices(
      (eval_features, eval_labels)).repeat().batch(1)

with strategy.scope():
  model = tf.keras.models.Sequential([tf.keras.layers.Dense(1)])
  optimizer = tf.keras.optimizers.Adagrad(learning_rate=0.05)
  model.compile(optimizer, "mse")

model.fit(dataset, epochs=5, steps_per_epoch=10)
Epoch 1/5
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/data/ops/dataset_ops.py:453: UserWarning: To make it possible to preserve tf.data options across serialization boundaries, their implementation has moved to be part of the TensorFlow graph. As a consequence, the options value is in general no longer known at graph construction time. Invoking this method in graph mode retains the legacy behavior of the original implementation, but note that the returned value might not reflect the actual value of the options.
  warnings.warn("To make it possible to preserve tf.data options across "
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
2021-11-13 02:31:09.110074: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:766] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 3
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:4"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 2
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}

2021-11-13 02:31:09.115349: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:766] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 3
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:4"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 2
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}

2021-11-13 02:31:09.117963: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:766] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 3
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:4"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 2
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
10/10 - 3s - loss: 7.4912 - 3s/epoch - 259ms/step
Epoch 2/5
10/10 - 0s - loss: 3.3420 - 43ms/epoch - 4ms/step
Epoch 3/5
10/10 - 0s - loss: 1.9022 - 44ms/epoch - 4ms/step
Epoch 4/5
10/10 - 0s - loss: 1.1536 - 42ms/epoch - 4ms/step
Epoch 5/5
10/10 - 0s - loss: 0.7208 - 43ms/epoch - 4ms/step
<keras.callbacks.History at 0x7f45d83f3a50>
model.evaluate(eval_dataset, steps=10, return_dict=True)
1/10 [==>...........................] - ETA: 11s - loss: 2.4114
2021-11-13 02:31:10.757780: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:766] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 3
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:8"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 2
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}

2021-11-13 02:31:10.910985: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:766] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 3
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:8"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 2
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
10/10 [==============================] - 2s 38ms/step - loss: 3.8431
2021-11-13 02:31:11.053772: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:766] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 3
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:8"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 2
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
{'loss': 3.843122}

מחיצות ( tf.distribute.experimental.partitioners )

ParameterServerStrategy ב-TensorFlow 2 תומך במחיצות משתנות ומציע מחיצות זהות לזה של TensorFlow 1, עם שמות פחות מבלבלים: - tf.compat.v1.variable_axis_size_partitioner -> tf.distribute.experimental.partitioners.MaxSizePartitioner על מחיצות תחת גודל מקסימלי) . - tf.compat.v1.min_max_variable_partitioner -> tf.distribute.experimental.partitioners.MinSizePartitioner : מחיצה שמקצה גודל מינימלי לכל רסיס. - tf.compat.v1.fixed_size_partitioner -> tf.distribute.experimental.partitioners.FixedShardsPartitioner : מחיצה המקצה מספר קבוע של רסיסים.

לחלופין, אתה יכול להשתמש באובייקט MultiWorkerMirroredStrategy :

# To clean up the `TF_CONFIG` used for `ParameterServerStrategy`.
del os.environ['TF_CONFIG']
strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CommunicationImplementation.AUTO

אתה יכול להחליף את האסטרטגיה המשמשת למעלה באובייקט MultiWorkerMirroredStrategy כדי לבצע אימון עם אסטרטגיה זו.

כמו עם ממשקי ה-API של tf.estimator , מכיוון MultiWorkerMirroredStrategy היא אסטרטגיה מרובת לקוחות, אין דרך קלה להפעיל הדרכה מבוזרת במחברת Colab זו. לכן, החלפת הקוד שלמעלה באסטרטגיה זו מסתיימת בהפעלת הדברים באופן מקומי. הדרכות ריבוי עובדים עם Keras Model.fit / לולאת הדרכה מותאמת אישית מדגימים כיצד להפעיל הדרכה מרובת עובדים עם הגדרת המשתנה 'TF_CONFIG' , עם שני עובדים על מארח מקומי ב-Colab. בפועל, תיצור מספר עובדים על כתובות/יציאות IP חיצוניות, ותשתמש במשתנה 'TF_CONFIG' כדי לציין את תצורת האשכול עבור כל עובד.

הצעדים הבאים

למידע נוסף על הכשרה מבוזרת מרובת עובדים עם tf.distribute.experimental.ParameterServerStrategy ו- tf.distribute.MultiWorkerMirroredStrategy ב-TensorFlow 2, שקול את המשאבים הבאים: