يوم مجتمع ML هو 9 نوفمبر! الانضمام إلينا للحصول على التحديثات من TensorFlow، JAX، وأكثر معرفة المزيد

ترحيل تدريب متعدد العاملين على وحدة المعالجة المركزية / وحدة معالجة الرسومات

عرض على TensorFlow.org تشغيل في Google Colab عرض المصدر على جيثب تحميل دفتر

يوضح هذا الدليل كيفية ترحيل سير العمل التدريبي الموزع متعدد العمال من TensorFlow 1 إلى TensorFlow 2.

لأداء تدريب متعدد العمال باستخدام وحدات المعالجة المركزية / وحدات معالجة الرسومات:

اقامة

ابدأ ببعض عمليات الاستيراد الضرورية ومجموعة بيانات بسيطة لأغراض توضيحية:

# Install tf-nightly as the notebook uses a dataset instance for `Model.fit`
# with `ParameterServerStrategy`, which depends on symbols in TF 2.7.
!pip uninstall -q -y tensorflow keras
!pip install -q tf-nightly
# Install a utility needed for this demonstration
!pip install portpicker

import tensorflow as tf
import tensorflow.compat.v1 as tf1
features = [[1., 1.5], [2., 2.5], [3., 3.5]]
labels = [[0.3], [0.5], [0.7]]
eval_features = [[4., 4.5], [5., 5.5], [6., 6.5]]
eval_labels = [[0.8], [0.9], [1.]]

وسوف تحتاج إلى 'TF_CONFIG' متغير البيئة التكوين للتدريب على أجهزة متعددة في TensorFlow. استخدام 'TF_CONFIG' لتحديد 'cluster' و 'task' ق "عناوين. (تعرف على المزيد في Distributed_training دليل).

import json
import os

tf_config = {
    'cluster': {
        'chief': ['localhost:11111'],
        'worker': ['localhost:12345', 'localhost:23456', 'localhost:21212'],
        'ps': ['localhost:12121', 'localhost:13131'],
    },
    'task': {'type': 'chief', 'index': 0}
}

os.environ['TF_CONFIG'] = json.dumps(tf_config)

استخدم del تصريح لإزالة متغير (ولكن في تدريب متعدد عامل في العالم الحقيقي في TensorFlow 1، فلن يكون للقيام بذلك):

del os.environ['TF_CONFIG']

TensorFlow 1: تدريب موزع متعدد العمال باستخدام واجهات برمجة تطبيقات tf.estimator

التعليمات البرمجية المتكررة يوضح سير العمل الكنسي للتدريب متعددة عامل في TF1: كنت ستستخدم tf.estimator.Estimator ، و tf.estimator.TrainSpec ، و tf.estimator.EvalSpec ، و tf.estimator.train_and_evaluate API لتوزيع التدريب:

def _input_fn():
  return tf1.data.Dataset.from_tensor_slices((features, labels)).batch(1)

def _eval_input_fn():
  return tf1.data.Dataset.from_tensor_slices(
      (eval_features, eval_labels)).batch(1)

def _model_fn(features, labels, mode):
  logits = tf1.layers.Dense(1)(features)
  loss = tf1.losses.mean_squared_error(labels=labels, predictions=logits)
  optimizer = tf1.train.AdagradOptimizer(0.05)
  train_op = optimizer.minimize(loss, global_step=tf1.train.get_global_step())
  return tf1.estimator.EstimatorSpec(mode, loss=loss, train_op=train_op)

estimator = tf1.estimator.Estimator(model_fn=_model_fn)
train_spec = tf1.estimator.TrainSpec(input_fn=_input_fn)
eval_spec = tf1.estimator.EvalSpec(input_fn=_eval_input_fn)
tf1.estimator.train_and_evaluate(estimator, train_spec, eval_spec)
INFO:tensorflow:Using default config.
WARNING:tensorflow:Using temporary folder as model directory: /tmp/tmpu5zsvkn2
INFO:tensorflow:Using config: {'_model_dir': '/tmp/tmpu5zsvkn2', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': None, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_checkpoint_save_graph_def': True, '_service': None, '_cluster_spec': ClusterSpec({}), '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1}
INFO:tensorflow:Not using Distribute Coordinator.
INFO:tensorflow:Running training and evaluation locally (non-distributed).
INFO:tensorflow:Start train and evaluate loop. The evaluate will happen after every checkpoint. Checkpoint frequency is determined based on RunConfig arguments: save_checkpoints_steps None or save_checkpoints_secs 600.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/training/training_util.py:401: Variable.initialized_value (from tensorflow.python.ops.variables) is deprecated and will be removed in a future version.
Instructions for updating:
Use Variable.read_value. Variables in 2.X are initialized automatically both in eager and graph (inside tf.defun) contexts.
INFO:tensorflow:Calling model_fn.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/training/adagrad.py:143: calling Constant.__init__ (from tensorflow.python.ops.init_ops) with dtype is deprecated and will be removed in a future version.
Instructions for updating:
Call initializer instance with the dtype argument instead of passing it to the constructor
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Create CheckpointSaverHook.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...
INFO:tensorflow:Saving checkpoints for 0 into /tmp/tmpu5zsvkn2/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...
INFO:tensorflow:loss = 0.0176871, step = 0
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 3...
INFO:tensorflow:Saving checkpoints for 3 into /tmp/tmpu5zsvkn2/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 3...
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Starting evaluation at 2021-09-22T20:01:54
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Restoring parameters from /tmp/tmpu5zsvkn2/model.ckpt-3
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Inference Time : 0.16660s
INFO:tensorflow:Finished evaluation at 2021-09-22-20:01:55
INFO:tensorflow:Saving dict for global step 3: global_step = 3, loss = 0.0040814565
INFO:tensorflow:Saving 'checkpoint_path' summary for global step 3: /tmp/tmpu5zsvkn2/model.ckpt-3
INFO:tensorflow:Loss for final step: 0.034454126.
({'loss': 0.0040814565, 'global_step': 3}, [])

TensorFlow 2: تدريب متعدد العمال مع استراتيجيات التوزيع

في TensorFlow 2، وزعت التدريب عبر عاملين متعددة مع وحدات المعالجة المركزية، وحدات معالجة الرسومات، ويتم ذلك عن طريق TPUs tf.distribute.Strategy الصورة.

يوضح المثال التالي كيفية استخدام اثنين من هذه الاستراتيجيات: tf.distribute.experimental.ParameterServerStrategy و tf.distribute.MultiWorkerMirroredStrategy ، وكلاهما مصمم لتدريب CPU / GPU مع العمال متعددة.

ParameterServerStrategy يعمل منسق ( 'chief' )، مما يجعله أكثر ملاءمة مع البيئة في هذه المفكرة Colab. وسوف يتم استخدام بعض المرافق هنا لإعداد عناصر داعمة أساسية للحصول على تجربة runnable هنا: ستقوم بإنشاء كتلة في العملية، حيث يتم استخدام المواضيع لمحاكاة الخوادم المعلمة ( 'ps' ) والعمال ( 'worker' ) . لمزيد من المعلومات حول التدريب الخادم معلمة، الرجوع إلى التدريب الخادم المعلمة مع ParameterServerStrategy البرنامج التعليمي.

في هذا المثال، أولا تحديد 'TF_CONFIG' متغير البيئة مع tf.distribute.cluster_resolver.TFConfigClusterResolver لتوفير المعلومات العنقودية. إذا كنت تستخدم نظام إدارة المجموعة للتدريب الخاص بك الموزعة، معرفة ما اذا كان يوفر 'TF_CONFIG' لك بالفعل، وفي هذه الحالة لا تحتاج إلى بوضوح تعيين متغير البيئة هذا. (مزيد من المعلومات في إعداد 'TF_CONFIG' قسم متغير البيئة في التدريب الموزعة مع TensorFlow دليل).

# Find ports that are available for the `'chief'` (the coordinator),
# `'worker'`s, and `'ps'` (parameter servers).
import portpicker

chief_port = portpicker.pick_unused_port()
worker_ports = [portpicker.pick_unused_port() for _ in range(3)]
ps_ports = [portpicker.pick_unused_port() for _ in range(2)]

# Dump the cluster information to `'TF_CONFIG'`.
tf_config = {
    'cluster': {
        'chief': ["localhost:%s" % chief_port],
        'worker': ["localhost:%s" % port for port in worker_ports],
        'ps':  ["localhost:%s" % port for port in ps_ports],
    },
    'task': {'type': 'chief', 'index': 0}
}
os.environ['TF_CONFIG'] = json.dumps(tf_config)

# Use a cluster resolver to bridge the information to the strategy created below.
cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()

ثم، إنشاء tf.distribute.Server الصورة للعمال والخوادم معلمة واحدة تلو الأخرى:

# Workers need some inter_ops threads to work properly.
# This is only needed for this notebook to demo. Real servers
# should not need this.
worker_config = tf.compat.v1.ConfigProto()
worker_config.inter_op_parallelism_threads = 4

for i in range(3):
  tf.distribute.Server(
      cluster_resolver.cluster_spec(),
      job_name="worker",
      task_index=i,
      config=worker_config)

for i in range(2):
  tf.distribute.Server(
      cluster_resolver.cluster_spec(),
      job_name="ps",
      task_index=i)

في العالم الحقيقي وزعت التدريب، بدلا من البدء جميع tf.distribute.Server على منسق، سوف تستخدم آلات متعددة الصورة، وتلك التي يتم تصنيفها "worker" الصورة و "ps" (ملقمات المعلمة) سيكون على كل تشغيل tf.distribute.Server . الرجوع إلى مجموعات في قسم العالم الحقيقي في المعلمة التدريب الخادم البرنامج التعليمي لمزيد من التفاصيل.

مع كل شيء جاهزا، إنشاء ParameterServerStrategy الكائن:

strategy = tf.distribute.experimental.ParameterServerStrategy(cluster_resolver)
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'chief': ['localhost:18600'], 'ps': ['localhost:18336', 'localhost:15625'], 'worker': ['localhost:16022', 'localhost:17375', 'localhost:18365']})
INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'chief': ['localhost:18600'], 'ps': ['localhost:18336', 'localhost:15625'], 'worker': ['localhost:16022', 'localhost:17375', 'localhost:18365']})
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:chief/replica:0/task:0/device:GPU:0'], variable_device = '/job:chief/replica:0/task:0/device:GPU:0'
INFO:tensorflow:Number of GPUs on workers: 1

وبمجرد الانتهاء من إنشاء كائن استراتيجية، وتحديد النموذج، ومحسن، وغيرها من المتغيرات، واستدعاء Keras Model.compile داخل Strategy.scope API لتوزيع التدريب. (يرجى الرجوع إلى Strategy.scope مستندات API لمزيد من المعلومات).

إذا كنت تفضل تخصيص التدريب الخاص بك عن طريق، على سبيل المثال، تحديد يمر إلى الأمام والخلف، الرجوع إلى التدريب مع التدريب قسم حلقة مخصصة في المعلمة التدريب الخادم البرنامج التعليمي لمزيد من التفاصيل.

dataset = tf.data.Dataset.from_tensor_slices(
      (features, labels)).shuffle(10).repeat().batch(64)

eval_dataset = tf.data.Dataset.from_tensor_slices(
      (eval_features, eval_labels)).repeat().batch(1)

with strategy.scope():
  model = tf.keras.models.Sequential([tf.keras.layers.Dense(1)])
  optimizer = tf.keras.optimizers.Adagrad(learning_rate=0.05)
  model.compile(optimizer, "mse")

model.fit(dataset, epochs=5, steps_per_epoch=10)
Epoch 1/5
/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/data/ops/dataset_ops.py:453: UserWarning: To make it possible to preserve tf.data options across serialization boundaries, their implementation has moved to be part of the TensorFlow graph. As a consequence, the options value is in general no longer known at graph construction time. Invoking this method in graph mode retains the legacy behavior of the original implementation, but note that the returned value might not reflect the actual value of the options.
  warnings.warn("To make it possible to preserve tf.data options across "
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
2021-09-22 20:01:56.897008: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:765] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 3
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:4"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 2
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}

2021-09-22 20:01:56.899102: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:765] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 3
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:4"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 2
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}

2021-09-22 20:01:56.914645: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:765] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 3
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:4"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 2
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
10/10 - 2s - loss: 0.1728 - 2s/epoch - 176ms/step
Epoch 2/5
10/10 - 0s - loss: 0.0102 - 73ms/epoch - 7ms/step
Epoch 3/5
10/10 - 0s - loss: 0.0091 - 71ms/epoch - 7ms/step
Epoch 4/5
10/10 - 0s - loss: 0.0083 - 71ms/epoch - 7ms/step
Epoch 5/5
10/10 - 0s - loss: 0.0077 - 73ms/epoch - 7ms/step
<keras.callbacks.History at 0x7fec502068d0>
model.evaluate(eval_dataset, steps=10, return_dict=True)
1/10 [==>...........................] - ETA: 12s - loss: 0.0761
2021-09-22 20:01:58.817232: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:765] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 3
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:8"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 2
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}

2021-09-22 20:01:58.997047: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:765] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 3
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:8"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 2
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
8/10 [=======================>......] - ETA: 0s - loss: 0.2278
2021-09-22 20:01:59.177706: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:765] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 3
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:8"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 2
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
10/10 [==============================] - 2s 50ms/step - loss: 0.2356
{'loss': 0.23556283}

Partitioners ( tf.distribute.experimental.partitioners )

ParameterServerStrategy في TensorFlow 2 يدعم تقسيم متغير والعروض نفسها partitioners كما TensorFlow 1، مع أسماء أقل الخلط: - tf.compat.v1.variable_axis_size_partitioner -> tf.distribute.experimental.partitioners.MaxSizePartitioner : أ المجزىء التي تحافظ على شظايا تحت حجم كحد أقصى) . - tf.compat.v1.min_max_variable_partitioner -> tf.distribute.experimental.partitioners.MinSizePartitioner : أ المجزىء أن يخصص حجم الحد الأدنى للقشرة. - tf.compat.v1.fixed_size_partitioner -> tf.distribute.experimental.partitioners.FixedShardsPartitioner : أ المجزىء أن يخصص عدد محدد من القطع.

بدلا من ذلك، يمكنك استخدام MultiWorkerMirroredStrategy الكائن:

# To clean up the `TF_CONFIG` used for `ParameterServerStrategy`.
del os.environ['TF_CONFIG']
strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CommunicationImplementation.AUTO

يمكنك استبدال استراتيجية المستخدمة أعلاه مع MultiWorkerMirroredStrategy الكائن لإنجاز التدريب مع هذه الاستراتيجية.

كما هو الحال مع tf.estimator واجهات برمجة التطبيقات، منذ MultiWorkerMirroredStrategy استراتيجية متعددة العميل، لا توجد وسيلة سهلة لتشغيل التدريب وزعت في هذه المفكرة Colab. لذلك ، يؤدي استبدال الكود أعلاه بهذه الإستراتيجية إلى تشغيل الأشياء محليًا. تدريب متعدد عامل مع Keras Model.fit / تدريب مخصصة حلقة دروس لشرح كيفية تشغيل تدريبية متعددة العمال ومع 'TF_CONFIG' متغير إعداد، مع اثنين من العمال على المضيف المحلي في Colab. في الممارسة العملية، سوف تقوم بإنشاء العمال متعددة على عناوين IP / المنافذ الخارجية، واستخدام 'TF_CONFIG' المتغير لتحديد تكوين كتلة لكل عامل.

الخطوات التالية

لمعرفة المزيد من التدريب حول متعددة عامل وزعت مع tf.distribute.experimental.ParameterServerStrategy و tf.distribute.MultiWorkerMirroredStrategy في TensorFlow 2، والنظر في الموارد التالية: