مهاجرت آموزش CPU/GPU چند کارگر

مشاهده در TensorFlow.org در Google Colab اجرا شود مشاهده منبع در GitHub دانلود دفترچه یادداشت

این راهنما نشان می دهد که چگونه می توانید گردش کار آموزشی توزیع شده چند کارگری خود را از TensorFlow 1 به TensorFlow 2 منتقل کنید.

برای انجام آموزش چندکاره با CPU/GPU:

برپایی

با برخی از واردات ضروری و یک مجموعه داده ساده برای اهداف نمایشی شروع کنید:

# The notebook uses a dataset instance for `Model.fit` with
# `ParameterServerStrategy`, which depends on symbols in TF 2.7.
# Install a utility needed for this demonstration
!pip install portpicker

import tensorflow as tf
import tensorflow.compat.v1 as tf1
features = [[1., 1.5], [2., 2.5], [3., 3.5]]
labels = [[0.3], [0.5], [0.7]]
eval_features = [[4., 4.5], [5., 5.5], [6., 6.5]]
eval_labels = [[0.8], [0.9], [1.]]

برای آموزش روی چندین ماشین در TensorFlow به متغیر محیط پیکربندی 'TF_CONFIG' نیاز دارید. از 'TF_CONFIG' برای مشخص کردن آدرس‌های 'cluster' و 'task' استفاده کنید. (در راهنمای Distributed_training بیشتر بیاموزید.)

import json
import os

tf_config = {
    'cluster': {
        'chief': ['localhost:11111'],
        'worker': ['localhost:12345', 'localhost:23456', 'localhost:21212'],
        'ps': ['localhost:12121', 'localhost:13131'],
    },
    'task': {'type': 'chief', 'index': 0}
}

os.environ['TF_CONFIG'] = json.dumps(tf_config)

از دستور del برای حذف متغیر استفاده کنید (اما در آموزش چندکاره در دنیای واقعی در TensorFlow 1، لازم نیست این کار را انجام دهید):

del os.environ['TF_CONFIG']

TensorFlow 1: آموزش توزیع شده چند کارگری با APIهای tf.estimator

قطعه کد زیر گردش کار متعارف آموزش چند کارگری را در TF1 نشان می دهد: شما از یک tf.estimator.Estimator ، یک tf.estimator.TrainSpec ، یک tf.estimator.EvalSpec و tf.estimator.train_and_evaluate برای توزیع استفاده خواهید کرد. تمرین:

def _input_fn():
  return tf1.data.Dataset.from_tensor_slices((features, labels)).batch(1)

def _eval_input_fn():
  return tf1.data.Dataset.from_tensor_slices(
      (eval_features, eval_labels)).batch(1)

def _model_fn(features, labels, mode):
  logits = tf1.layers.Dense(1)(features)
  loss = tf1.losses.mean_squared_error(labels=labels, predictions=logits)
  optimizer = tf1.train.AdagradOptimizer(0.05)
  train_op = optimizer.minimize(loss, global_step=tf1.train.get_global_step())
  return tf1.estimator.EstimatorSpec(mode, loss=loss, train_op=train_op)

estimator = tf1.estimator.Estimator(model_fn=_model_fn)
train_spec = tf1.estimator.TrainSpec(input_fn=_input_fn)
eval_spec = tf1.estimator.EvalSpec(input_fn=_eval_input_fn)
tf1.estimator.train_and_evaluate(estimator, train_spec, eval_spec)
INFO:tensorflow:Using default config.
WARNING:tensorflow:Using temporary folder as model directory: /tmp/tmpvfb91q_5
INFO:tensorflow:Using config: {'_model_dir': '/tmp/tmpvfb91q_5', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': None, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_checkpoint_save_graph_def': True, '_service': None, '_cluster_spec': ClusterSpec({}), '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1}
INFO:tensorflow:Not using Distribute Coordinator.
INFO:tensorflow:Running training and evaluation locally (non-distributed).
INFO:tensorflow:Start train and evaluate loop. The evaluate will happen after every checkpoint. Checkpoint frequency is determined based on RunConfig arguments: save_checkpoints_steps None or save_checkpoints_secs 600.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/training/training_util.py:401: Variable.initialized_value (from tensorflow.python.ops.variables) is deprecated and will be removed in a future version.
Instructions for updating:
Use Variable.read_value. Variables in 2.X are initialized automatically both in eager and graph (inside tf.defun) contexts.
INFO:tensorflow:Calling model_fn.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/training/adagrad.py:143: calling Constant.__init__ (from tensorflow.python.ops.init_ops) with dtype is deprecated and will be removed in a future version.
Instructions for updating:
Call initializer instance with the dtype argument instead of passing it to the constructor
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Create CheckpointSaverHook.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...
INFO:tensorflow:Saving checkpoints for 0 into /tmp/tmpvfb91q_5/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...
INFO:tensorflow:loss = 0.038075272, step = 0
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 3...
INFO:tensorflow:Saving checkpoints for 3 into /tmp/tmpvfb91q_5/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 3...
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Starting evaluation at 2021-11-13T02:31:06
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Restoring parameters from /tmp/tmpvfb91q_5/model.ckpt-3
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Inference Time : 0.13630s
INFO:tensorflow:Finished evaluation at 2021-11-13-02:31:06
INFO:tensorflow:Saving dict for global step 3: global_step = 3, loss = 0.005215075
INFO:tensorflow:Saving 'checkpoint_path' summary for global step 3: /tmp/tmpvfb91q_5/model.ckpt-3
INFO:tensorflow:Loss for final step: 0.061832994.
({'loss': 0.005215075, 'global_step': 3}, [])

TensorFlow 2: آموزش چند کارگری با استراتژی های توزیع

در TensorFlow 2، آموزش توزیع شده بین چندین کارگر با CPU، GPU و TPU از طریق tf.distribute.Strategy s انجام می شود.

مثال زیر نحوه استفاده از دو چنین استراتژی را نشان می دهد: tf.distribute.experimental.ParameterServerStrategy و tf.distribute.MultiWorkerMirroredStrategy ، که هر دو برای آموزش CPU/GPU با چندین کارگر طراحی شده اند.

ParameterServerStrategy از یک هماهنگ کننده ( 'chief' ) استفاده می کند که آن را با محیط این نوت بوک Colab سازگارتر می کند. در اینجا از برخی ابزارهای کمکی برای تنظیم عناصر پشتیبانی ضروری برای یک تجربه قابل اجرا در اینجا استفاده خواهید کرد: یک خوشه در فرآیند ایجاد خواهید کرد، که در آن نخ ها برای شبیه سازی سرورهای پارامتر ( 'ps' ) و کارگران ( 'worker' ) استفاده می شوند. . برای اطلاعات بیشتر در مورد آموزش سرور پارامتر به آموزش Parameter Server با آموزش ParameterServerStrategy مراجعه کنید.

در این مثال، ابتدا متغیر محیطی 'TF_CONFIG' با یک tf.distribute.cluster_resolver.TFConfigClusterResolver کنید تا اطلاعات خوشه را ارائه کنید. اگر از یک سیستم مدیریت خوشه برای آموزش توزیع شده خود استفاده می کنید، بررسی کنید که آیا قبلاً 'TF_CONFIG' برای شما فراهم می کند یا خیر، در این صورت نیازی به تنظیم صریح این متغیر محیطی ندارید. (در بخش تنظیم متغیر محیطی 'TF_CONFIG' در آموزش توزیع شده با راهنمای TensorFlow بیشتر بیاموزید.)

# Find ports that are available for the `'chief'` (the coordinator),
# `'worker'`s, and `'ps'` (parameter servers).
import portpicker

chief_port = portpicker.pick_unused_port()
worker_ports = [portpicker.pick_unused_port() for _ in range(3)]
ps_ports = [portpicker.pick_unused_port() for _ in range(2)]

# Dump the cluster information to `'TF_CONFIG'`.
tf_config = {
    'cluster': {
        'chief': ["localhost:%s" % chief_port],
        'worker': ["localhost:%s" % port for port in worker_ports],
        'ps':  ["localhost:%s" % port for port in ps_ports],
    },
    'task': {'type': 'chief', 'index': 0}
}
os.environ['TF_CONFIG'] = json.dumps(tf_config)

# Use a cluster resolver to bridge the information to the strategy created below.
cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()

سپس، tf.distribute.Server s را برای کارگران و سرورهای پارامتر یک به یک ایجاد کنید:

# Workers need some inter_ops threads to work properly.
# This is only needed for this notebook to demo. Real servers
# should not need this.
worker_config = tf.compat.v1.ConfigProto()
worker_config.inter_op_parallelism_threads = 4

for i in range(3):
  tf.distribute.Server(
      cluster_resolver.cluster_spec(),
      job_name="worker",
      task_index=i,
      config=worker_config)

for i in range(2):
  tf.distribute.Server(
      cluster_resolver.cluster_spec(),
      job_name="ps",
      task_index=i)

در آموزش های توزیع شده در دنیای واقعی، به جای اینکه همه tf.distribute.Server ها را روی هماهنگ کننده راه اندازی کنید، از چندین ماشین استفاده خواهید کرد و ماشین هایی که به عنوان "worker" و "ps" (سرورهای پارامتر) تعیین شده اند، هر کدام از آنها استفاده خواهند کرد. یک tf.distribute.Server را اجرا کنید. برای جزئیات بیشتر به کلاسترها در بخش دنیای واقعی در آموزش آموزش سرور Parameter مراجعه کنید.

با همه چیز آماده، شی ParameterServerStrategy را ایجاد کنید:

strategy = tf.distribute.experimental.ParameterServerStrategy(cluster_resolver)
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'chief': ['localhost:16660'], 'ps': ['localhost:15313', 'localhost:20369'], 'worker': ['localhost:21380', 'localhost:18699', 'localhost:19420']})
INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'chief': ['localhost:16660'], 'ps': ['localhost:15313', 'localhost:20369'], 'worker': ['localhost:21380', 'localhost:18699', 'localhost:19420']})
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:chief/replica:0/task:0/device:GPU:0'], variable_device = '/job:chief/replica:0/task:0/device:GPU:0'
INFO:tensorflow:Number of GPUs on workers: 1

هنگامی که یک شی استراتژی ایجاد کردید، مدل، بهینه ساز و سایر متغیرها را تعریف کنید و Keras Model.compile را در Strategy.scope API فراخوانی کنید تا آموزش توزیع شود. (برای اطلاعات بیشتر به اسناد API Strategy.scope مراجعه کنید.)

اگر ترجیح می‌دهید آموزش خود را با تعریف کردن پاس‌های رو به جلو و عقب سفارشی کنید، برای جزئیات بیشتر به آموزش با بخش حلقه آموزشی سفارشی در آموزش آموزش سرور Parameter مراجعه کنید.

dataset = tf.data.Dataset.from_tensor_slices(
      (features, labels)).shuffle(10).repeat().batch(64)

eval_dataset = tf.data.Dataset.from_tensor_slices(
      (eval_features, eval_labels)).repeat().batch(1)

with strategy.scope():
  model = tf.keras.models.Sequential([tf.keras.layers.Dense(1)])
  optimizer = tf.keras.optimizers.Adagrad(learning_rate=0.05)
  model.compile(optimizer, "mse")

model.fit(dataset, epochs=5, steps_per_epoch=10)
Epoch 1/5
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/data/ops/dataset_ops.py:453: UserWarning: To make it possible to preserve tf.data options across serialization boundaries, their implementation has moved to be part of the TensorFlow graph. As a consequence, the options value is in general no longer known at graph construction time. Invoking this method in graph mode retains the legacy behavior of the original implementation, but note that the returned value might not reflect the actual value of the options.
  warnings.warn("To make it possible to preserve tf.data options across "
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
2021-11-13 02:31:09.110074: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:766] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 3
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:4"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 2
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}

2021-11-13 02:31:09.115349: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:766] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 3
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:4"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 2
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}

2021-11-13 02:31:09.117963: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:766] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 3
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:4"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 2
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
10/10 - 3s - loss: 7.4912 - 3s/epoch - 259ms/step
Epoch 2/5
10/10 - 0s - loss: 3.3420 - 43ms/epoch - 4ms/step
Epoch 3/5
10/10 - 0s - loss: 1.9022 - 44ms/epoch - 4ms/step
Epoch 4/5
10/10 - 0s - loss: 1.1536 - 42ms/epoch - 4ms/step
Epoch 5/5
10/10 - 0s - loss: 0.7208 - 43ms/epoch - 4ms/step
<keras.callbacks.History at 0x7f45d83f3a50>
model.evaluate(eval_dataset, steps=10, return_dict=True)
1/10 [==>...........................] - ETA: 11s - loss: 2.4114
2021-11-13 02:31:10.757780: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:766] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 3
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:8"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 2
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}

2021-11-13 02:31:10.910985: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:766] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 3
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:8"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 2
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
10/10 [==============================] - 2s 38ms/step - loss: 3.8431
2021-11-13 02:31:11.053772: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:766] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 3
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:8"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 2
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
{'loss': 3.843122}

پارتیشن‌کننده‌ها ( tf.distribute.experimental.partitioners )

ParameterServerStrategy در TensorFlow 2 از پارتیشن بندی متغیر پشتیبانی می کند و پارتیشن بندی های مشابهی با TensorFlow 1 ارائه می دهد، با نام های کمتر گیج کننده: - tf.compat.v1.variable_axis_size_partitioner -> tf.distribute.experimental.partitioners.MaxSizePartitioner که حداکثر اندازه را نگه می دارد: . - tf.compat.v1.min_max_variable_partitioner -> tf.distribute.experimental.partitioners.MinSizePartitioner : پارتیشن‌کننده‌ای که حداقل اندازه را برای هر قطعه اختصاص می‌دهد. - tf.compat.v1.fixed_size_partitioner -> tf.distribute.experimental.partitioners.FixedShardsPartitioner : پارتیشن‌کننده‌ای که تعداد ثابتی از خرده‌ها را اختصاص می‌دهد.

همچنین، می‌توانید از یک شی MultiWorkerMirroredStrategy استفاده کنید:

# To clean up the `TF_CONFIG` used for `ParameterServerStrategy`.
del os.environ['TF_CONFIG']
strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CommunicationImplementation.AUTO

برای انجام آموزش با این استراتژی می توانید استراتژی استفاده شده در بالا را با یک شی MultiWorkerMirroredStrategy جایگزین کنید.

مانند API های tf.estimator ، از آنجایی که MultiWorkerMirroredStrategy یک استراتژی چند مشتری است، هیچ راه آسانی برای اجرای آموزش توزیع شده در این نوت بوک Colab وجود ندارد. بنابراین، جایگزینی کد بالا با این استراتژی منجر به اجرای کارها به صورت محلی می شود. آموزش Multi-worker با Keras Model.fit / آموزش های حلقه آموزشی سفارشی نشان می دهد که چگونه می توان آموزش چند کارگری را با تنظیم متغیر 'TF_CONFIG' با دو کارگر در یک لوکال هاست در Colab اجرا کرد. در عمل، می‌توانید چندین کارگر روی آدرس‌ها/پورت‌های IP خارجی ایجاد کنید و از متغیر 'TF_CONFIG' برای تعیین پیکربندی کلاستر برای هر کارگر استفاده کنید.

مراحل بعدی

برای کسب اطلاعات بیشتر در مورد آموزش توزیع شده چندکاره با tf.distribute.experimental.ParameterServerStrategy و tf.distribute.MultiWorkerMirroredStrategy در TensorFlow 2، منابع زیر را در نظر بگیرید: